Issues with the current replication system:
Some of the observed issues with the current replication are as follows:
- Slow
- Requires staging dirs with full copies (4xcopy problem)
- Unsuitable for load-balancing use-cases
- Incompatibility with ACID
- Dependency on external tools to do a lot (staging dir mgmt, manage state info/etc)
We will thus first try to understand why each of these occur and what we can do about them.
Slowness
So why is it currently slow?
The primary reason for slowness is because it depends on state transfer, rather than delta-replay. This means that the amount of data being funneled across to the destination is much larger. This is especially a problem with frequent updates/inserts. (Creates cannot be delta-optimized since the original state is null, and deletes are instantaneous.)
The secondary reason is that the original implementation was made to make sure we got "correctness" work in terms of resilience, and we were planning optimizations that would drastically reduce the number of events we'd process, but we have not yet implemented those optimizations. The optimization would have worked by processing a window of events at a time and skipping processing of some of the events if it turns out that a future event nullified the effect of processing the event( as in cases where an insert was followed by an insert, or a drop followed a create, etc). Thus, our current implementation can be seen as a naive implementation where the window size is 1.
Requiring staging dirs with full copies (4xcopy problem)
Again, this problem comes down to us needing to do a state transfer, and using export and import to do it. The first copy we have is the source table. Then, we have to export that to a staging directory - this is our second. This then has to be dist-cp-ed over to the destination cluster, which then forms our third copy. Then, on import, it impresses the data on to the destination table - this is our 4th copy.
Now, 2 of these copies, the source table and the dest table are necessary from the very nomenclature of replication - we want 2 copies. Now, the fact that we require 2 additional copies temporarily in staging directories is the cheif issue here. For clusters without much temporary overflow space, this becomes a major constraint.
Let us examine each of these copies and the design reasoning behind each. Firstly, on the source, the reason the export is done is to give a stable point that will be resilient when we're trying to distcp to destination, in case the table should be modified during the attempt to copy it over. If we do not make a copy, the critical section of the source-side work is a much longer section, and any replay/recovery semantics get much more complex.
On the destination side, optimizations are certainly possible, and in fact, are actually done, so that we don't have an extra copy here. On import, import actually moves the files to the dest table, thus, our 4xcopy problem is actually a 3x copy problem. However, we've taken to calling this the 4xcopy problem since that was the first problem we hit and then solved.
However, what this optimization does mean is that if we fail during import after moving, then the redo of that emport will actually require a redo of the export side as well, and thus, is not trivially retriable. This was a conscious decision as the likelihood of this happening is low, in comparison to the other failure points we have. If we were to desire to make the import resiliently retriable as well, we will have a 4x copy problem.
Unsuitable for load-balancing usecases
By forcing an "export" early, we handle DR usecases, so that even if the source hive wh should be compromised, we will not suffer unduly, and can replay our exported commands on the destination and recover. However, in doing so, we treat each table and partition as independent objects, for which the only important consideration is that we save the latest state each, without consideration to how they got there.
Thus, any cross-table relationships can be temporarily lost (an object can exist in one table that refers to something in another table which does not have the update yet), and queries that run across tables can produce results on the destination that never existed in the source. All this makes it so that the current implementation of replication is entirely unsuitable for load balancing.
Essentially the primary problem is that the state transfer approach means that each object is considered independent, and each object can "rubber-band" to the latest version of itself. If all events have been processed, we will be in a stable state which is identical to the source, and thus, this can work for load balancing for users that have a pronounced "loading period" on their warehouse that is separate from their "reading period", which allows us time in the middle to catch up and process all events. This is also true at a table level. This can work for many of the traditional data warheousing use cases, but fails for many analytics-like expectations.
We will delve further into this rubberbanding problem in a separate section later, since it is a primary problem we attempt to solve in Replv2.
Incompatibility with ACID
Personally, I think that notions of replication should be developed on top of a robust transactional system(and it is easier to develop that too), and trying to manage replications without such concepts is what leads to problems like the ones we face in hive, where we have to do the best we can with what we have. Hive's flexibility in usage being one of it's major usability points, and having to solve the needs of the majority of the install base is what leads us to try to develop replication without assuming ACID.
And, having said that, due to the nature of compactions, our current approach of simply copying data and re-impressing metadata does not work without the need to copy over transaction objects, translate them and recompact on the destination as well, which does not work well without an idea of distributed transactions as well.
Thus, it is ironic that in our implementation of replication, we do not support ACID tables. We've considered what we would need to do to replicate ACID tables, and in most discussions, a popular notion seems to be one of using streaming to send deltas over to the destination, rather than to copy over the files and trying to fudge around with the transaction metadata. This, however, will require quite some more work, and thus, is not something we're planning on addressing in replv2 either. It is likely to be a major push/focus of the next batch of work we put into replication.
Dependency on external tools to do a lot
Our current implementation assumes that we extend how EXPORT and IMPORT work, allow a Notification and ReplicationTask/Command based api that an external tool can use to implement replication on top of us. However, this means that they are the ones that have to manage staging directories, and in addition, have to manage the notion of what state each of our destination tables/dbs are in, and over time, there is a possibility of extensive hive logic bleeding into them. Apache Falcon has a tool called HiveDR, which has implemented these interfaces, and they've expressed a desire that hive take on some more of the management aspects for a cleaner interface.
To this end, one of the goals of replv2 would be that we manage our own staging directories, and instead of replication tools being the ones that move data over, we step in more proactively to pull the data from the source to the destination.
Rubberbanding
Consider the following series of operations:
CREATE TABLE blah (a int) PARTITIONED BY (b string);
INSERT INTO TABLE blah [PARTITION (p="a") VALUES 5;
INSERT INTO TABLE blah [PARTITION (p="b") VALUES 10;
INSERT INTO TABLE blah [PARTITION (p="a") VALUES 15;
Now, for each operation that occurs, a monotonically increasing state-id is provided by DbNotificationListener, so that we have an ability to order those events by when they occurred. For the sake of simplicity, let's say they occurred at states 10,20,30,40 respectively, in order.
Now, if there were another thread running "SELECT * from blah;" from another thread, then depending on when the SELECT command ran, it would have differing results:
- If it ran before 10, then the table does not yet exist, and will return an error.
- If it ran between 10 & 20, then the table exists, but has no contents, and thus, it will return an empty set.
- If it ran between 20 & 30, then it will return { (a,5) }
- If it ran between 30 & 40, then it will return { (a,5) , (b,10) } , i.e. 2 rows.
- If it ran after 40, then it will return { (a,5) , (b,10) , (a,15) }.
Now, the problem with the state transfer approach of current replication as it occurs is that if we replicate these sequence of events from source to a destination warehouse, it is entirely likely that the very first time EXPORT runs on the table is quite a while after the event has occurred, say at around event 500. At this point of time, if it tries to export the state of the partition (p="a"), then it will capture all the changes that have occurred till that point.
Let us denote the event of processing an event E to replicate E from source to destination as PROC(E). Thus, PROC(10) would denote the processing of event 10 from source to destination.
Now, let us look at the same select * behaviour we observed on the source as it occurs on the destination.
- If the select * runs before PROC(10), then we get an error, since the table has not yet been created.
- If the select * runs between PROC(10) & PROC(20), then it will result in the partition p="a") being impressed over.
- If PROC(20) occurs before 40 has occurred, then it will return { (a,5) }
- If PROC(20) occurs after 40 has occurred, then it will return { (a,5) , (a,15) } - This is because the partition state captured by PROC(20) will occur after 40, and thus contain (a,15), but partition p="b" has not yet been re-impressed because we haven't yet re-impressed that partition, which will occur only at PROC(30).
We stop our examination at this point, because we see one possible outcome from the select * on the destination which was impossible at the source. This is the problem introduced by state-transfer that we term rubber-banding(nomenclature coming in from online games which deal with each individual player having different latencies, and the server having to reconcile updates in a staggered/stuttering fashion)
State-transfer has a few good things going for it, such as being resilient and idempotent, but it introduces this problem of temporary states that are possible which never existed in the source, and this is a big no-no for load-balancing use-cases where the destination db is not simply a cold backup but a db that is actively being used for reads.
Change management
// TODO