...
This document describes the second version of Hive Replication. Please refer to the first version of Hive Replication for details on prior implementation.
Issues with the Current Replication System
Some of the observed issues with the current replication implementation are as follows:
- Slowness
- Requiring staging dirs with full copies (4xcopy problem)
- Unsuitability for load-balancing use-cases
- Incompatibility with ACID
- Dependency on external tools to do a lot (staging dir mgmt, manage state info, etc.)
We will thus first try to understand why each of these occurs and what we can do about them.
Slowness
Why is the first version of Hive Replication slow?
The primary reason for its slowness is that it depends on state transfer, rather than delta-replay. This means that the amount of data being funneled across to the destination is much larger than it otherwise would be. This is especially a problem with frequent updates/inserts. (Creates cannot be delta-optimized since the original state is null, and deletes are instantaneous.)
The secondary reason is that the original implementation was designed to ensure "correctness" in terms of resilience. We were planning optimizations that would drastically reduce the number of events processed, but these have not yet been implemented. The optimizations would have worked by processing a window of events at a time and skipping the processing of some of the events when a future event nullified the effect of processing the first event (as in cases where an insert followed an insert, or a drop followed a create, etc.). Thus, our current implementation can be seen as a naive implementation where the window size is 1.
Requiring Staging Directories with Full Copies (4xcopy Problem)
Again, this problem comes down to needing to do a state transfer, and using export and import to do it. The first copy is the source table, which is then exported to a staging directory. This is the second copy. It has to be dist-cp-ed over to the destination cluster, which then forms the third copy. Then, upon import, it impresses the data on to the destination table, becoming the fourth copy.
This work is under development and interfaces are subject to change. This has been designed for use in conjunction with external orchestration tools, which would be responsible for co-ordinating the right sequence of commands between source and target clusters, fault tolerance/failure handling, and also providing correct configuration options that are necessary to be able to do cross cluster replication.
As of Hive 3.0.0 release : only managed table replication where Hive user owns the table contents is supported. External tables, ACID tables, statistics and constraint replication are not supported.
Issues with the Current Replication System
Some of the observed issues with the current replication implementation are as follows:
- Slowness
- Requiring staging dirs with full copies (4xcopy problem)
- Unsuitability for load-balancing use-cases
- Incompatibility with ACID
- Dependency on external tools to do a lot (staging dir mgmt, manage state info, etc.)
We will thus first try to understand why each of these occurs and what we can do about them.
Slowness
Why is the first version of Hive Replication slow?
The primary reason for its slowness is that it depends on state transfer, rather than delta-replay. This means that the amount of data being funneled across to the destination is much larger than it otherwise would be. This is especially a problem with frequent updates/inserts. (Creates cannot be delta-optimized since the original state is null, and deletes are instantaneous.)
The secondary reason is that the original implementation was designed to ensure "correctness" in terms of resilience. We were planning optimizations that would drastically reduce the number of events processed, but these have not yet been implemented. The optimizations would have worked by processing a window of events at a time and skipping the processing of some of the events when a future event nullified the effect of processing the first event (as in cases where an insert followed an insert, or a drop followed a create, etc.). Thus, our current implementation can be seen as a naive implementation where the window size is 1.
Requiring Staging Directories with Full Copies (4xcopy Problem)
Again, this problem comes down to needing to do a state transfer, and using export and import to do it. The first copy is the source table, which is then exported to a staging directory. This is the second copy. It has to be dist-cp-ed over to the destination cluster, which then forms the third copy. Then, upon import, it impresses the data on to the destination table, becoming the fourth copy.
Now, two of these copies, the source table Now, two of these copies, the source table and the destination table, are necessary from the very nomenclature of replication - two copies are needed. The chief issue is that two additional copies are required temporarily in staging directories. For clusters without much temporary overflow space, this becomes a major constraint.
...
- An event happens on the source that causes a change (at, say, t1)
- A notification event is generated for it (at, say, t2)
- That notification event is then processed on source to "ready" an actionable task to do on the destination to replicate this. (at, say, t3)
- The requisite data is copied over from the source wh to the destination warehouse
- The destination then performs whatever task is needed to restate
NowNow, so far, our primary problem seems to be that we can only capture "latest" state, and not the original state at the time the event occurred. That is to say that at the time we process the notification, we get the state of the object at that time, t3, instead of the state of the object at time t1. In the time between t1 and t3, the object may have changed substantially, and if we go ahead and take the state at t3, and then apply to destination in an idempotent fashion, always taking only updates, we get our current implementation, with the rubberbanding problem.
...
The current implementation of replication is built upon existing commands EXPORT and IMPORT. These commands are semantically more suited to the task of exporting and importing, than of a direct notion of an applicable event log. The notion of a lazy _files behaviour on EXPORT is not a good fit, since EXPORTs are done with the understanding that they need to be a stable copy irrespective of cleanup policies on the source. In addition, EXPORTing "events" is something that is more tenuous. EXPORTing a CREATE event is easy enough, but it is a semantic stretch to export a DROP event. Thus, to fit our needs better, and to not have to keep making the existing EXPORT and IMPORT way more complex, we introduce a new REPL command, with three modes of operation: REPL DUMP, REPL LOAD and REPL STATUS.
REPL DUMP
Syntax:
REPL DUMP <dbname>[.<tablename>] [<repl_policy> {REPLACE <old_repl_policy>} {FROM <init-evid> [{TO <end-evid>] [} {LIMIT <num-evids>} } {WITH ('key1'='value1', 'key2'='value2')};
Replication policy: <dbname>{{.[<comma_separated_include_tables_regex_list>] ];}{.[<comma_separated_exclude_tables_regex_list>]}}
This is better described via various examples of each of the pieces of the command syntax, as follows:
R Replicates out sales database for bootstrap, from <init-evid>=0 (bootstrap case) to <end-evid>=<CURR-EVID> with a batch size of 0, i.e. no batching.
(b) REPL DUMP sales.Q3['T3', '[a-z]+'];
SSimilar to case (a), but sets up tabledb-level replication instead of db-level repl.that includes only table/view 'T3' and any table/view names with just alphabets of any length such as 'orders', 'stores' etc.
(c) REPL DUMP sales FROM 200 TO 1400;
The presence of a FROM <init-evid> tag makes this dump not a bootstrap, but a dump which looks at the event log to produce a delta dump.
FROM 200 TO 1400 is self-evident in that it will go through event ids 200 to 1400 looking for events from the relevant db.
(d) REPL DUMP sales FROM 200;
Similar to above, but with an implicit assumed <end-evid> as being the current event id at the time the command is run.
(f) REPL DUMP sales FROM 200 to 1400 LIMIT 100;
REPL DUMP sales FROM 200 LIMIT 100;
Similar to cases (d) & (e), with the addition of a batch size of <num-evids>=100. This causes us to stop processing if we reach 100 events, and return at that point. Note that this does not mean that we stop processing at event id = 300, since we began at 200 - it means that we will stop processing events when we have processed 100 events in the event stream (that has unrelated events) belonging to this replication-definition, i.e. of a relevant db or db.table, then we stop.
Now, the dump generated will be similar to the kind of dumps generated by EXPORTs, in that it will contain a _metadata file, but it will not contain the actual data files, instead using a _files file as an indirection to the actual files. One more aspect of REPL DUMP is that it does not take a directory as an argument on where to dump into. Instead, it creates its own dump directory inside a root dir specified by a new HiveConf parameter, hive.repl.rootdir
, which will configure a root directory for dumps, and returns the dumped directory as part of the return value from it. It is intended also that we will introduce a replication dumpdir cleaner which will periodically clean it up.
Return values:
- Error codes returned as return error codes (and over jdbc if with HS2)
- Returns 2 columns in the ResultSet:
- <dir-name> - the directory to which it has dumped info.
- <last-evid> - the last event-id associated with this dump, which might be the end-evid, or the curr-evid, as the case may be.
This call is intended to be synchronous, and expects the caller to wait for the result.
REPL LOAD
REPL LOAD [<dbname>[.<tablename>]] FROM <dirname>;
.['.*?'].['T[0-9]+', 'Q4'];
Similar to case(a), but sets up db-level replication that excludes table/view 'Q4' and all table/view names that have prefix 'T' and numeric suffix of any length. For example, 'T3', 'T400', 't255' etc. The table/view names are case-insensitive in nature and hence table/view name with prefix 't' would also be excluded from dump.
(d) REPL DUMP sales.[];
This sets up db-level replication that excludes all the tables/views but includes only functions.
(e) REPL DUMP sales FROM 200 TO 1400;
The presence of a FROM <init-evid> tag makes this dump not a bootstrap, but a dump which looks at the event log to produce a delta dump. FROM 200 TO 1400 is self-evident in that it will go through event ids 200 to 1400 looking for events from the relevant db.
(f) REPL DUMP sales FROM 200;
Similar to above, but with an implicit assumed <end-evid> as being the current event id at the time the command is run.
(g) REPL DUMP sales FROM 200 to 1400 LIMIT 100;REPL DUMP sales FROM 200 LIMIT 100;
Similar to cases (d) & (e), with the addition of a batch size of <num-evids>=100. This causes us to stop processing if we reach 100 events, and return at that point. Note that this does not mean that we stop processing at event id = 300, since we began at 200 - it means that we will stop processing events when we have processed 100 events in the event stream (that has unrelated events) belonging to this replication-definition, i.e. of a relevant db or db.table, then we stop.
(h) REPL DUMP sales.['[a-z]+'] REPLACE sales FROM 200;
REPL DUMP sales.['[a-z]+', 'Q5'] REPLACE sales.['[a-z]+'] FROM 500;
This is an example of changing the replication policy/scope dynamically during incremental replication cycle.
In first case, a full DB replication policy "sales" is changed to a replication policy that includes only table/view names with only alphabets "sales.['[a-z]+']" such as "stores", "products" etc. The REPL LOAD using this dump would intelligently drops the tables which are excluded as per the new policy. For instance, table with name 'T5' would be automatically dropped during REPL LOAD if it is already there in target cluster.
In second case, policy is again changed to include table/view 'Q5' and in this case, Hive would intelligently bootstrap the table/view 'Q5' in the current incremental dump. The same is applicable for table/view renames where
(i) REPL DUMP sales WITH ('hive.repl.include.external.tables'='false', 'hive.repl.dump.metadata.only'='true');
The REPL DUMP command has an optional WITH clause to set command-specific configurations to be used when trying to dump. These configurations are only used by the corresponding REPL DUMP command and won't be used for other queries running in the same session. In this example, we set the configurations to exclude external tables and also include only metadata and don't dump data.
Return values:
- Error codes returned as return error codes (and over jdbc if with HS2)
- Returns 2 columns in the ResultSet:
- <dir-name> - the directory to which it has dumped info.
- <last-evid> - the last event-id associated with this dump, which might be the end-evid, or the curr-evid, as the case may be.
Note:
Now, the dump generated will be similar to the kind of dumps generated by EXPORTs, in that it will contain a _metadata file, but it will not contain the actual data files, instead using a _files file as an indirection to the actual files. One more aspect of REPL DUMP is that it does not take a directory as an argument on where to dump into. Instead, it creates its own dump directory inside a root dir specified by a new HiveConf parameter, hive.repl.rootdir
, which will configure a root directory for dumps, and returns the dumped directory as part of the return value from it. It is intended also that we will introduce a replication dumpdir cleaner which will periodically clean it up.
This call is intended to be synchronous, and expects the caller to wait for the result.
If HiveConf parameter hive.in.test
is false
, REPL DUMP will not use a new dump location, thus it will garble an existing dump. Hence before taking an incremental dump, clear the bootstrap dump location if hive.in.test
is false.
Bootstrap note : The FROM clause means that we read the event log to determine what to dump. For bootstrapping, we would not use FROM.
When bootstrap dump is in progress, it blocks rename table/partition operations on any tables of the dumped database and throws HiveException. Once bootstrap dump is completed, rename operations are enabled and will work as normal. If HiveServer2 crashes when bootstrap dump in progress, then rename operations will continue to throw HiveException even after HiveServer2 is restored with no REPL DUMP in progress. This abnormal state should be manually fixed using following work around.
Look up the HiveServer logs for below pair of log messages.
REPL DUMP:: Set property for Database: <db_name>, Property: <bootstrap.dump.state.xxxx>, Value: ACTIVE
REPL DUMP:: Reset property for Database: <db_name>, Property: <bootstrap.dump.state.xxxx>
If Reset property log is not found for the corresponding Set property log, then user need to manually reset the database property <bootstrap.dump.state.xxxx> with value as "IDLE" using ALTER DATABASE command.
REPL LOAD
REPL LOAD {<dbname>} FROM <dirname> {WITH ('key1'='value1', 'key2'='value2')};
The REPL LOAD command has an optional WITH clause to set command-specific configurations to be used when trying to copy from the source cluster. These configurations are only used by the corresponding REPL LOAD command and won't be used for other queries running in the same session.
Return values:
- Error codes returned as normal.
- Does not return anything in ResultSet, expects user to run REPL STATUS to check.
REPL STATUS
REPL STATUS <dbname>[.<tablename>];
Return values:
- Error codes returned as normal.
- Returns the last replication state (event ID) for the given database.
Bootstrap, Revisited
When we introduced the notion of a need for bootstrap, we said that the problem of time passing during the bootstrap was something of a problem that needed solving separately.
...
Let us consider the case of a table T1, which was dumped out around evid=200. Now, let us say that the following operations have occurred on the two tables during the time the dump has been proceeding:
event id | operation |
---|---|
184 | ALTER TABLE T1 DROP PARTITION(Px) |
196 | ALTER TABLE T1 ADD PARTITION(Pa) |
204 | ALTER TABLE T1 ADD PARTITION(Pb) |
216 | ALTER TABLE T1 DROP PARTITION(Py) |
...
Basically, let us try to understand what happens when partitions are added(Pa & Pb) and dropped(Px & Py) both before and after a table is dumped. So, for our bootstrap, we go through 2 phases - first an object dump of all the objects we're expected to dump, and then a consolidation phase where we go through all the events that occurred during our object dump.
...
While this can work, the problem with this approach is that the destination can now have tables at differing states as a result of the dump - i.e. a table T2 that was dumped at about evid=220 will have newer info than T1 that was dumped about evid=200, and this is a sort of mini-rubberbanding in itself, since different parts of a whole are at different states. This problem is actually a little worse, since different partitions of a table can actually be at different states. Thus, we will not follow this approach.
Approach 2 : Consolidate at source.
...
REPLDIR("hive.repl.rootdir","/user/hive/repl/", "HDFS root dir for all replication dumps."),
REPLCMDIR("hive.repl.cmrootdir","/user/hive/cmroot/", "Root dir for ChangeManager, used for deleted files."),
REPLCMRETIAN("hive.repl.cm.retain","24h", new TimeValidator(TimeUnit.HOURS),"Time to retain removed files in cmrootdir."),
REPLCMINTERVAL("hive.repl.cm.interval","3600s",new TimeValidator(TimeUnit.SECONDS),"Inteval for cmroot cleanup thread."),