You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Issues with the current replication system:

Some of the observed issues with the current replication are as follows:

  1. Slow
  2. Requires staging dirs with full copies (4xcopy problem)
  3. Unsuitable for load-balancing use-cases
  4. Incompatibility with ACID
  5. Dependency on external tools to do a lot (staging dir mgmt, manage state info/etc)

 

We will thus first try to understand why each of these occur and what we can do about them.

Slowness

So why is it currently slow?

The primary reason for slowness is because it depends on state transfer, rather than delta-replay. This means that the amount of data being funneled across to the destination is much larger. This is especially a problem with frequent updates/inserts. (Creates cannot be delta-optimized since the original state is null, and deletes are instantaneous.)

The secondary reason is that the original implementation was made to make sure we got "correctness" work in terms of resilience, and we were planning optimizations that would drastically reduce the number of events we'd process, but we have not yet implemented those optimizations. The optimization would have worked by processing a window of events at a time and skipping processing of some of the events if it turns out that a future event nullified the effect of processing the event( as in cases where an insert was followed by an insert, or a drop followed a create, etc). Thus, our current implementation can be seen as a naive implementation where the window size is 1.

Requiring staging dirs with full copies (4xcopy problem)

Again, this problem comes down to us needing to do a state transfer, and using export and import to do it. The first copy we have is the source table. Then, we have to export that to a staging directory - this is our second. This then has to be dist-cp-ed over to the destination cluster, which then forms our third copy. Then, on import, it impresses the data on to the destination table - this is our 4th copy.

Now, 2 of these copies, the source table and the dest table are necessary from the very nomenclature of replication - we want 2 copies. Now, the fact that we require 2 additional copies temporarily in staging directories is the cheif issue here. For clusters without much temporary overflow space, this becomes a major constraint.

Let us examine each of these copies and the design reasoning behind each. Firstly, on the source, the reason the export is done is to give a stable point that will be resilient when we're trying to distcp to destination, in case the table should be modified during the attempt to copy it over. If we do not make a copy, the critical section of the source-side work is a much longer section, and any replay/recovery semantics get much more complex.

On the destination side, optimizations are certainly possible, and in fact, are actually done, so that we don't have an extra copy here. On import, import actually moves the files to the dest table, thus, our 4xcopy problem is actually a 3x copy problem. However, we've taken to calling this the 4xcopy problem since that was the first problem we hit and then solved.

However, what this optimization does mean is that if we fail during import after moving, then the redo of that emport will actually require a redo of the export side as well, and thus, is not trivially retriable. This was a conscious decision as the likelihood of this happening is low, in comparison to the other failure points we have. If we were to desire to make the import resiliently retriable as well, we will have a 4x copy problem.


Unsuitable for load-balancing usecases

By forcing an "export" early, we handle DR usecases, so that even if the source hive wh should be compromised, we will not suffer unduly, and can replay our exported commands on the destination and recover. However, in doing so, we treat each table and partition as independent objects, for which the only important consideration is that we save the latest state each, without consideration to how they got there.

Thus, any cross-table relationships can be temporarily lost (an object can exist in one table that refers to something in another table which does not have the update yet), and queries that run across tables can produce results on the destination that never existed in the source. All this makes it so that the current implementation of replication is entirely unsuitable for load balancing.

Essentially the primary problem is that the state transfer approach means that each object is considered independent, and each object can "rubber-band" to the latest version of itself. If all events have been processed, we will be in a stable state which is identical to the source, and thus, this can work for load balancing for users that have a pronounced "loading period" on their warehouse that is separate from their "reading period", which allows us time in the middle to catch up and process all events. This is also true at a table level. This can work for many of the traditional data warheousing use cases, but fails for many analytics-like expectations.

We will delve further into this rubberbanding problem in a separate section later, since it is a primary problem we attempt to solve in Replv2.

Incompatibility with ACID

Personally, I think that notions of replication should be developed on top of a robust transactional system(and it is easier to develop that too), and trying to manage replications without such concepts is what leads to problems like the ones we face in hive, where we have to do the best we can with what we have. Hive's flexibility in usage being one of it's major usability points, and having to solve the needs of the majority of the install base is what leads us to try to develop replication without assuming ACID.

And, having said that, due to the nature of compactions, our current approach of simply copying data and re-impressing metadata does not work without the need to copy over transaction objects, translate them and recompact on the destination as well, which does not work well without an idea of distributed transactions as well.

Thus, it is ironic that in our implementation of replication, we do not support ACID tables. We've considered what we would need to do to replicate ACID tables, and in most discussions, a popular notion seems to be one of using streaming to send deltas over to the destination, rather than to copy over the files and trying to fudge around with the transaction metadata. This, however, will require quite some more work, and thus, is not something we're planning on addressing in replv2 either. It is likely to be a major push/focus of the next batch of work we put into replication.

Dependency on external tools to do a lot

Our current implementation assumes that we extend how EXPORT and IMPORT work, allow a Notification and ReplicationTask/Command based api that an external tool can use to implement replication on top of us. However, this means that they are the ones that have to manage staging directories, and in addition, have to manage the notion of what state each of our destination tables/dbs are in, and over time, there is a possibility of extensive hive logic bleeding into them. Apache Falcon has a tool called HiveDR, which has implemented these interfaces, and they've expressed a desire that hive take on some more of the management aspects for a cleaner interface.

To this end, one of the goals of replv2 would be that we manage our own staging directories, and instead of replication tools being the ones that move data over, we step in more proactively to pull the data from the source to the destination.


Rubberbanding


Consider the following series of operations:

CREATE TABLE blah (a int) PARTITIONED BY (b string);
INSERT INTO TABLE blah [PARTITION (p="a") VALUES 5;
INSERT INTO TABLE blah [PARTITION (p="b") VALUES 10;
INSERT INTO TABLE blah [PARTITION (p="a") VALUES 15;


Now, for each operation that occurs, a monotonically increasing state-id is provided by DbNotificationListener, so that we have an ability to order those events by when they occurred. For the sake of simplicity, let's say they occurred at states 10,20,30,40 respectively, in order.

Now, if there were another thread running "SELECT * from blah;" from another thread, then depending on when the SELECT command ran, it would have differing results:

  1. If it ran before 10, then the table does not yet exist, and will return an error.
  2. If it ran between 10 & 20, then the table exists, but has no contents, and thus, it will return an empty set.
  3. If it ran between 20 & 30, then it will return { (a,5) }
  4. If it ran between 30 & 40, then it will return { (a,5) , (b,10) } , i.e. 2 rows.
  5. If it ran after 40, then it will return { (a,5) , (b,10) , (a,15) }.

Now, the problem with the state transfer approach of current replication as it occurs is that if we replicate these sequence of events from source to a destination warehouse, it is entirely likely that the very first time EXPORT runs on the table is quite a while after the event has occurred, say at around event 500. At this point of time, if it tries to export the state of the partition (p="a"), then it will capture all the changes that have occurred till that point.

Let us denote the event of processing an event E to replicate E from source to destination as PROC(E). Thus, PROC(10) would denote the processing of event 10 from source to destination.

Now, let us look at the same select * behaviour we observed on the source as it occurs on the destination.

  1. If the select * runs before PROC(10), then we get an error, since the table has not yet been created.
  2.  If the select * runs between PROC(10) & PROC(20), then it will result in the partition p="a") being impressed over.
    1. If PROC(20) occurs before 40 has occurred, then it will return { (a,5) }
    2. If PROC(20) occurs after 40 has occurred, then it will return { (a,5) , (a,15) } - This is because the partition state captured by PROC(20) will occur after 40, and thus contain (a,15), but partition p="b" has not yet been re-impressed because we haven't yet re-impressed that partition, which will occur only at PROC(30).

We stop our examination at this point, because we see one possible outcome from the select * on the destination which was impossible at the source. This is the problem introduced by state-transfer that we term rubber-banding(nomenclature coming in from online games which deal with each individual player having different latencies, and the server having to reconcile updates in a staggered/stuttering fashion)

State-transfer has a few good things going for it, such as being resilient and idempotent, but it introduces this problem of temporary states that are possible which never existed in the source, and this is a big no-no for load-balancing use-cases where the destination db is not simply a cold backup but a db that is actively being used for reads.


Change management

Let us now consider a base part of a replication workflow. It would need to have the following parts:

  1. An event happens on the source that causes a change (at, say, t1)
  2. A notification event is generated for it (at, say, t2)
  3. That notification event is then processed on source to "ready" an actionable task to do on the destination to replicate this. (at, say, t3)
  4. The requisite data is copied over from the source wh to the destination warehouse
  5. The destination then performs whatever task is needed to restate

 

Now, so far, our primary problem seems to be that we can only capture "latest" state, and not the original state at the time the event occurred. That is to say that at the time we process the notification, we get the state of the object at that time, t3, instead of the state of the object at time t1. In the time between t1 and t3, the object may have changed substantially, and if we go ahead and take the state at t3, and then apply to destination in an idempotent fashion, always taking only updates, we get our current implementation, with the rubberbanding problem.


Fundamentally, this is the core of our problem. To not have rubberbanding, one of the following must be true of that time period between t1 & t3:

  1. No other change must have happenned to the object - which means that we do the equivalent of locking the object in question from t1 to t3. Such an approach is possible if t1 & t3 occur in the same transaction interval.
  2. If changes to the object between t1 & t3 are inevitable, we must have a way of recording each state change, so that when t3 rolls around, we still have the original t1 state available somewhere.

Route (1) is how we should approach ACID tables, and should be the way hopefully all hive tables are accessed at some point in the future. The benefit of the transactional route is that we would have exactly the delta/change that we're applying, and we would save that delta to pass on to the other side.

In the meanwhile, however, we must try to solve (2) as well. To this end, our goal with replv2 is to make sure that if there is any hive access that makes any change to an object, we capture the original state. There are two aspects to the original state - the metadata and the data. The metadata is easily solvable, since t1 & t2 can be done in the context of a single hive operation, and we can impress the metadata for the notification and our change to the metadata in the same metastore transaction. This now leaves us the question of what happens with the backing filesystem data for the object.


Now, in addition to this problem that we're solving of tracking what the filesystem state was at the time we did our dump, we have one more problem we want to solve, and that is that of the 4x(or 3x) copy problem. We've already solved the problem with the extra copy on the destination. Now, we need to somehow prevent the extra copy on the source to make this work. Essentially, what we need, to prevent making an extra copy of the entire data on the source, we need to have a "stable" way of determining what the FS backing state for the object was at the time the event occurred.

Both of these problems, that of the 4x/3x copy problem, and that of making sure that we know what FS state existed at t1 to prevent rubberbanding, are then solvable if we have a snapshot of the source filesystem at the time the event occurred. At first, this, to us, led us to looking at HDFS snapshots as the way to solve this problem. Unfortunately, HDFS snapshots, while they would solve our problem, are, per discussion with HDFS folks, not something we can create a large number of, and we might very well likely need a snapshot for every single event that comes along.

However, the idea behind the snapshot is still what we really want, and if HDFS cannot support the number of snapshots that we would create, it is possible for us to do a pseudo-snapshot, so that for all files that are backing hive objects, if we detect any hive operation would move them away or modify them, we retain the original in a separate directory, similar to how we manage Trash. This pseudo-trash like capturing behaviour is what we refer to as the "change-management" piece and is the main piece that needs to be in place to solve the rubberbanding problem as well as the 4x/3x copy problem.

 

  • No labels