  1. Slow
  2. Requires staging dirs with full copies (4xcopy problem)
  3. Unsuitable for load-balancing use-cases
  4. Incompatibility with ACID
  5. Dependency on external tools to do a lot (staging dir mgmt, manage state info/etc)


We will thus first try to understand why each of these occur and what we can do about them.


To this end, one of the goals of replv2 would be that we manage our own staging directories, and instead of replication tools being the ones that move data over, we step in more proactively to pull the data from the source to the destination.


REPL LOAD [<dbname>[.<tablename>]] FROM <dirname>;

This causes a repl dump present in <dirname> (which is to be a fully qualified hdfs url) to be pulled and loaded. If <dbname> is specified, and the original dump was a db-level dump, this allows us to do db-rename-mapping on import. If <dbname>.<tablename> was specified, and the original dump was a table-level dump, then this allows us to do a table-rename-mapping on import. If neither dbname nor tablename are specified, the original dbname and tablename are used, as recorded in the dump.

Return values:

  1. Error codes returned as normal.
  2. Does not return anything in ResultSet, expects user to run REPL STATUS to check.



REPL STATUS <dbname>[.<tablename>];

Will return the same output that REPL LOAD returns, allows REPL LOAD to be run asynchronously. If no knowledge of a replication associated with that db / db.tbl is present, i.e., there are no known replications for that, we return an empty set. Note that for cases where a destination db or table exists, but no known repl exists for it, this should be considered an error condition for tools calling REPL LOAD to pass on to the end-user, to alert them that they may be overwriting an existing db/table with another.

Bootstrap, revisited

When we introduced the notion of a need for bootstrap, we said that the problem of time passing during the bootstrap was something of a problem that needed solving separately.

  1. No other change must have happenned to the object - which means that we do the equivalent of locking the object in question from t1 to t3. Such an approach is possible if t1 & t3 occur in the same transaction interval.
  2. If changes to the object between t1 & t3 are inevitable, we must have a way of recording each state change, so that when t3 rolls around, we still have the original t1 state available somewhere.

Route (1) is how we should approach ACID tables, and should be the way hopefully all hive tables are accessed at some point in the future. The benefit of the transactional route is that we would have exactly the delta/change that we're applying, and we would save that delta to pass on to the other side.

In the meanwhile, however, we must try to solve (2) as well. To this end, our goal with replv2 is to make sure that if there is any hive access that makes any change to an object, we capture the original state. There are two aspects to the original state - the metadata and the data. The metadata is easily solvable, since t1 & t2 can be done in the context of a single hive operation, and we can impress the metadata for the notification and our change to the metadata in the same metastore transaction. This now leaves us the question of what happens with the backing filesystem data for the object.


Both of these problems, that of the 4x copy problem, and that of making sure that we know what FS state existed at t1 to prevent rubberbanding, are then solvable if we have a snapshot of the source filesystem at the time the event occurred. At first, this, to us, led us to looking at HDFS snapshots as the way to solve this problem. Unfortunately, HDFS snapshots, while they would solve our problem, are, per discussion with HDFS folks, not something we can create a large number of, and we might very well likely need a snapshot for every single event that comes along.

However, the idea behind the snapshot is still what we really want, and if HDFS cannot support the number of snapshots that we would create, it is possible for us to do a pseudo-snapshot, so that for all files that are backing hive objects, if we detect any hive operation would move them away or modify them, we retain the original in a separate directory, similar to how we manage Trash. This pseudo-trash like capturing behaviour is what we refer to as the "change-management" piece and is the main piece that needs to be in place to solve the rubberbanding problem as well as the 4x copy problem.


Currently, when we do an EXPORT of a table, the directory structure created in this dump has, at its root, a _metadata file that contains all the metadata state to be impressed, and then has directory structures for each partition to be impressed.

To populate each of the partition directories, it runs a CopyTask that copies the files of each of the partitions over. Now, to make sure that we do not do secondary copies, our design is very simple - instead of a CopyTask, we use a ReplCopyTask, which, will, instead of copying the files to the destination directory, will instead create a file called _files in the destination directory with a list of each of the filenames of the original files.

Thus, instead of partition directories with actual data, we will instead have partition directories with _files files that then contain the location of the original files. (We will discuss and handle what happens when the original files get moved away or deleted later, for now, it is sufficient to assume that these urls will be stable urls to the state of the files at the time we did the dump, as if it were a pseudo-snapshot.)

Now, when this export dump is imported, we need to make sure that for each _files file loaded, we go through the contents of the _files, and apply the copy instead to the underlying file. Also, we will wind up invoking DistCp automatically from hive when we try to copy files over from a remote cluster. (Again, this can be optimized and will be discussed in detail later, but for now, it suffices that we are able to access it.)

With this notion of EXPORT creating _files as indirections to the actual files, and IMPORT loading _files to locate the actual files needing copying, we solve the 4x copy problem.


One of the requests we got was that by offloading too much of the requirements of replication, we push too much "hive knowledge" over to the tools that integrate with us, asking them to essentially bootstrap the destination warehouse to a point where it is capable of receiving incremental updates. Currently, we recommend that users run a manual "EXPORT ... FOR REPLICATION" on all tables involved, set up any dbs needed and IMPORT these dumps as needed, etc, to prepare a destination for replicating into. We need to introduce a mechanism by which we can set up a replication dump at a larger scale than just tables, say, at a DB level. For this purpose, the best fit seemed to be a new tool or command, similar to mysqldump.

(Note, in this section, I constantly refer to mysql and mysqldump, not because this is the only solution out there but because I'm a little familiar with it. Other dbs have equivalent tools)

There are a couple of major differences, however, between expectations we have of something like mysqldump, and a command we implement:

  1. The scale of the data involved in an initial dump is orders more for a hive warehouse as compared to a typical mysql db.
  2. Transactional isolation & log-based approaches means that mysqldump can have a stable snapshot of the entire db/metadata during which it proceeds to dump out all dbs and tables. So, even if it takes a while to dump them out, it need not worry about the objects changing while it gets dumped. We, on the other hand, need to handle that.


The first point can be solved by using our change-management semantics that we're developing, and using the lazy _files approach rather than a CopyTask.The second part is a little more involved, and needs to do some consolidation during the dump generation. Let us say that we begin the dump at evid=170, and by the time we finish the dump of all objects contained in our dump, it is now evid=230. For a consistent picture of the dump, we now also have to consolidate the information included in events 170-230 into our dump before we can pass the baton over to incremental replication. We will discuss this shortly, after a brief detour of new commands we introduce to manage the replication dump and reload.

New commands to help us



REPL DUMP <dbname>[.<tablename>] [FROM <init-evid> [TO <end-evid>] [BATCH <num-evids>] ];

This is better described via various examples of each of the pieces of the command syntax, as follows:

(a) REPL DUMP sales;

 Replicates out sales database for bootstrap, from <init-evid>=0 (bootstrap case) to <end-evid>=<CURR-EVID> with a batch size of 0, i.e. no batching.

(b) REPL DUMP sales.Q3;

 Similar to case (a), but sets up table-level replication instead of db-level repl.

(c) REPL DUMP sales FROM 200 TO 1400;

 The presence of a FROM <init-evid> tag makes this dump not a bootstrap, but a dump which looks at the event log to produce a delta dump.

 FROM 200 TO 1400 is self-evident in that it will go through event ids 200 to 1400 looking for events from the relevant db.

(d) REPL DUMP sales FROM 200;

 Similar to above, but with an implicit assumed <end-evid> as being the current event id at the time the command is run.

(f) REPL DUMP sales FROM 200 to 1400 BATCH 100;

   REPL DUMP sales FROM 200 BATCH 100;

Similar to cases (d) & (e), with the addition of a batch size of <num-evids>=100. This causes us to stop processing if we reach 100 events, and return at that point. Note that this does not mean that we stop processing at event id = 300, since we began at 200 - it means that we will stop processing events when we have processed 100 events in the event stream (that has unrelated events) belonging to this replication-definition, i.e. of a relevant db or db.table, then we stop.

Return values:

  1. Error codes returned as return error codes (and over jdbc if with HS2)
  2. Returns 2 columns in the ResultSet:
    1. <dir-name> - the directory to which it has dumped info.
    2. <last-evid> - the last event-id associated with this dump, which might be the end-evid, or the curr-evid, as the case may be.

This call is intended to be synchronous, and expects the caller to wait for the result.

Bootstrap note : The FROM clause means that we read the event log to determine what to dump. For bootstrapping, we would not use FROM.


REPL LOAD [<dbname>[.<tablename>]] FROM <dirname>;

This causes a repl dump present in <dirname> (which is to be a fully qualified hdfs url) to be pulled and loaded. If <dbname> is specified, and the original dump was a db-level dump, this allows us to do db-rename-mapping on import. If <dbname>.<tablename> was specified, and the original dump was a table-level dump, then this allows us to do a table-rename-mapping on import. If neither dbname nor tablename are specified, the original dbname and tablename are used, as recorded in the dump.

Return values:

  1. Error codes returned as normal.
  2. Does not return anything in ResultSet, expects user to run REPL STATUS to check.



REPL STATUS <dbname>[.<tablename>];



Let us consider the case of a table T1, which was dumped out around evid=200. Now, let us say that the following operations have occurred on the two tables during the time the dump has been proceeding:


event idoperation


Basically, let us try to understand what happens when partitions are added(Pa & Pb) and dropped(Px & Py) both before and after a table is dumped. So, for our bootstrap, we go through 2 phases - first an object dump of all the objects we're expected to dump, and then a consolidation phase where we go through all the events that occurred during our object dump.

If the table T1 was dumped at around evid=200, then, it will not contain partition Px, since the drop would have been processed before the dump occurred, and it will contain the partition Pa, since that partition was added before the object dump occurred. In contrast, partition Pb will not be present in the dump, since Pb will have not yet been added, and also, it will still contain partition Py, since that partition will not yet have been dropped.

So, given this disparity, we need to consolidate this somehow. There are a couple of ways of consolidation.

Approach 1 : Consolidate at destination.

Now, one approach to handle this would be to simply say that we say that the dump is of the minimum state for the whole object, say 170, and let the various events apply on the destination as long as they are applicable, and ignore errors (such as when we try to drop a partition Px from a replicated table T1 which already does not have Px in it.)

While this can work, the problem with this approach is that the destination can now have tables at differing states as a result of the dump - i.e. a table T2 that was dumped at about evid=220 will have newer info than T1 that was dumped about evid=200, and this is a sort of mini-rubberbanding in itself, since different parts of a whole are at different states. This problem is actually a little worse, since different partitions of a table can actually be at different states. Thus, we will not follow this approach.


Approach 2 : Consolidate at source.

The alternate approach, then, is to go through each of the events from evid=170 to evid=230 in our example, which are the current-event-ids at the beginning of the object dump phase and the end of the object dump phase respectively, and to use that to modify the object dumps that we've just made. Any drops will result in the dumped object being changed/deleted, and any creates will result in additional dumped objects being added. Alters will result in dumped objects being replaced by their newer equivalent. At the end of this consolidation, all objects dumped should be capable of being restored on the destination as if the state for them was 230, and incremental replication can then take over, processing event 230 onwards.

This is the approach we expect to take. One further modification this will require from the current export semantics, is that currently, export exports only 1 _metadata file per table, which contains the list of all the partitions inside it in the _metadata file itself. Instead, now, we propose to split that up so that the _metadata level at an object level will contain only metadata for that object. Thus, _metadata at a table level will contain only the table object, and the individual directories inside it will contain all the required partitions, and each of those dirs will have a partition level _metadata.