RFC Title

To be Reviewed By: May 7th, 2021

Authors: Alberto Gomez (alberto.gomez@est.tech)

Status: Draft | Discussion | Active | Dropped | Superseded

Superseded by: N/A

Related: N/A

Problem

There are several situations that require to synchronize the data between two Geode sites connected via WAN:

  • A Geode system is to be extended with a replica over a WAN by adding a new Geode site. In this case, the new site should load the region data of the first Geode site.
  • Two Geode sites have become out of sync due to a long-lasting network split. In this case, it might have been necessary to stop replication during the network split in order not to fill up the memory and disk dedicated to queue replication events. When the WAN connection between the Geode sites is restored, there is a need to synchronize both sites in order for the data to be consistent. A possible approach is to pick one of the sites as the master one and load the data of the regions of the winner site into the other site.
  • The recovery after a failure in a site of a WAN replicated deployment that requires reinstallation and resync of the data.

Geode offers several mechanisms to replicate the data of data regions from one WAN site to another for the above types of situations.

These mechanisms, nevertheless, suffer from some shortcomings that make them far from ideal to be used in the described situations.

  • Backup/restore:
    - It only works for persistent regions.
    - It does not support the restoration of a backup taken from a system in a newer version of Geode into a system in an older version.
    - It is not intended to be used to restore a backup from one site to another but rather to restore a previous backup of one site into the same site.
    - It cannot guarantee consistency of data across regions.
    - If the data in the region was populated using transactions, there would be no guarantee that those transactions are honored in the region unless there was not traffic ongoing when the backup was taken (as transaction events are not written atomically on disk).
    - The whole process to recover the WAN replication between the sites after the restore without stopping the service is cumbersome. It requires a procedure to clean replication queues in the master site, restart the replication right before the backup is taken and have available memory/disk resources to hold replication events while the remote site is not yet restored.
  • Import/export:
    - As it is based on the Snapshot service, it does not guarantee consistency of the data unless it is run when there is no traffic running.
    - It cannot guarantee consistency of snapshots across regions.
    - It suffers the same problem as backup and restore with respect to consistency of transactional data.
    - It suffers the same problems as backup and restore with respect to the process to recover the WAN replication without stopping the service.
  • Gemtouch:
    - Gemtouch is an OpenSource tool not part of Geode and no longer maintained.
    - As this tool replicates the entries of a region by means of generating a get and a put for every entry, it causes the undesired modification of the region entries (at least of the timestamp).
    - Also, even if the gets and puts of every entry is done inside a transaction, the process is subject to race conditions if it is run while non-transactional traffic is being sent to the Geode cluster. Such race conditions could provoke that traffic writes are overwritten by Gemtouch writes if the former writes are not done inside transactions.

This RFC tries to solve the specific problem of putting back into service (or for the first time) a Geode site that is to be part of a Geode WAN replicated system, that needs to load the data of the regions of the other Geode sites already running and at the same time get the new events generated via WAN replication in the source site.

The solution should be suitable for any WAN replication topology.

Anti-Goals

This RFC does not try to solve the problem of synchronizing on the fly two WAN replicated sites that may have diverged on the fly. The use case aimed at consists of having one or more WAN sites online with data and clients connected to them and a another site with no clients connected that needs to get the data from the other sites.

Solution

To overcome the different disadvantages of each of the mechanisms enumerated in the Problem section and to offer a more integrated solution in Geode, this RFC proposes to implement a command that replicates the data from a region in a site to another site that operates in a similar way as the Gemtouch tool but without the inconveniences shown above.

This gfsh command would allow to replicate a region from one site into another by specifying the region name and the senderId to be used to replicate the events.

The command would make use of an internal function that would run on every member of the Geode cluster hosting the region. This function, for each entry in the region, would create an event, equivalent to the one created when the entry was created/updated (with the same timestamp, key and value) and pass it to the gateway sender to replicate it.

The way to generate the event would be different for partitioned regions than for replicated regions but once the event is correctly created, a call to the distribute method of the AbstractGatewaySender should do the work needed to replicate the region event. Also, in the case of replicated regions, events should only be generated on the member that hosts the primary gateway sender.

The command/function could accept as a parameter the maximum replication rate (events/sec) in order limit the resources consumed while there is traffic running.

It would be desirable that another command is provided in order to stop an ongoing replication started by the replicate region command.

The command will assume that the destination site has the regions to be replicated from another site already created and that it has gateway receivers to get the data from the source site.

The command will also assume that during the replication of the data there will be no clients connected to destination site until the copy has finished. If, for any reason, the replication fails or does not finish, the command will have to be run again. The command should provide information about the result of the execution.

Events replicated by this command, must not be notified to the clients in the remote site and should not be sent to other sites from the remote site.

Changes and Additions to Public Interfaces

A new gfsh command is proposed to execute the replication of the region data.
The command would accept as input the region name, the senderId to be used and the maximum replication rate:

gfsh> replicate-region --region-name=<region name> --senderId=<sender-id> [--max-rate=<rate in events/sec>]


The command to stop an ongoing execution of the above command could be implemented as another command or reusing the above one with a parameter indicating that intention (–cancel option):

gfsh> replicate-region --region-name=<region name> --senderId=<sender-id> --cancel

Performance Impact

If the replicate-region command is run while the servers are running traffic it is expected to add some performance penalty in the traffic due to the consumption of resources of the command.

Backwards Compatibility and Upgrade Path

No impacts foreseen

Prior Art

What would be the alternatives to the proposed solution? What would happen if we don’t solve the problem? Why should this proposal be preferred?

FAQ

Answers to questions you’ve commonly been asked after requesting comments for this proposal.

Errata

  • Instead of putting events in the sender's queue, the command will put events directly in batches and pass them to the gateway sender to be sent to the remote site.
  • The name of the command has been changed to: wan-copy region
  • No labels

21 Comments

  1. I've written a few functions like this in the past. I attached one that I just verified works in the simple case (where there is no activity in either site).

    A lot of the code in this function should be moved internally (to PartitionedRegion, BucketRegion, EntryEventImpl, etc.), but you get the idea.

    How do you plan to deal with duplicates, missing events, ordering issues, concurrent modifications, etc?

    1. I cannot see the function attached. Did you forget to do it?

      The plan is that this command/function is used when there is traffic in the Geode cluster.

      I forseee there could be an issue with conflation. If a put for a key is done very close in time to when the function creates an event for the same key, the newer value may be overwritten by the old value depending on the order in which both events reach the sender queue. I think this could also happen already without the proposed function with two events for the same key almost concurrent although this function could increase the probability of occurring. For this case, we could make a modification in the conflation code to prevent this situation by checking the timestamp of events before conflating.

      Any other things we should take into account? I have not foreseen issues with duplicates and missing events so any extra information that I have not considered would be very welcome.

  2. Here is a test that shows what I mean by duplicates:

    1. Start servers in site 1
    2. Do puts in site 1 (which adds events to the sender queue for site 2)
    3. Run the function to initialize remote region (which adds events on the same entries to the sender queue for site 2)
    4. Start servers in site 2

    For 5 entries in site 1, a CacheListener in site 2 receives 10 events (5 creates, 5 updates):

    TestCacheListener processing event=EntryEventImpl[op=CREATE;region=/data;key=1;version={v1; rv1; ds=2; time=1619471769598};id=EventID[id=25 bytes;threadID=0x10001|1;sequenceID=1;bucketId=1];tailKey=114]
    TestCacheListener processing event=EntryEventImpl[op=CREATE;region=/data;key=4;version={v1; rv1; ds=2; time=1619471769614};id=EventID[id=25 bytes;threadID=0x10004|1;sequenceID=4;bucketId=4];tailKey=117]
    TestCacheListener processing event=EntryEventImpl[op=UPDATE;region=/data;key=4;version={v2; rv2; ds=2; time=1619471769614};id=EventID[id=22 bytes;threadID=0x10004|10;sequenceID=2;bucketId=4];tailKey=343]
    TestCacheListener processing event=EntryEventImpl[op=UPDATE;region=/data;key=1;version={v2; rv2; ds=2; time=1619471769598};id=EventID[id=22 bytes;threadID=0x10001|10;sequenceID=3;bucketId=1];tailKey=340]
    TestCacheListener processing event=EntryEventImpl[op=CREATE;region=/data;key=3;version={v1; rv1; ds=2; time=1619471769609};id=EventID[id=25 bytes;threadID=0x10003|1;sequenceID=3;bucketId=3];tailKey=116]
    TestCacheListener processing event=EntryEventImpl[op=UPDATE;region=/data;key=3;version={v2; rv2; ds=2; time=1619471769609};id=EventID[id=22 bytes;threadID=0x10003|10;sequenceID=0;bucketId=3];tailKey=342]
    TestCacheListener processing event=EntryEventImpl[op=CREATE;region=/data;key=0;version={v1; rv1; ds=2; time=1619471769582};id=EventID[id=25 bytes;threadID=0x10000|1;sequenceID=0;bucketId=0];tailKey=113]
    TestCacheListener processing event=EntryEventImpl[op=UPDATE;region=/data;key=0;version={v2; rv2; ds=2; time=1619471769582};id=EventID[id=22 bytes;threadID=0x10000|10;sequenceID=4;bucketId=0];tailKey=339]
    TestCacheListener processing event=EntryEventImpl[op=CREATE;region=/data;key=2;version={v1; rv1; ds=2; time=1619471769603};id=EventID[id=25 bytes;threadID=0x10002|1;sequenceID=2;bucketId=2];tailKey=115]
    TestCacheListener processing event=EntryEventImpl[op=UPDATE;region=/data;key=2;version={v2; rv2; ds=2; time=1619471769603};id=EventID[id=22 bytes;threadID=0x10002|10;sequenceID=1;bucketId=2];tailKey=341]


    Here is another test that shows duplicates. In this case, a ConcurrentCacheModification is thrown. I think the exception is handled in this case, but I'm not 100% sure.

    1. Start servers in sites 1 and 2
    2. Do puts in site 1 forever (which are sent to site 2)
    3. Run function to initialize remote region

    In some cases, site 2 receives an older event after a newer event:

    time=1619475101629 - ok
    time=1619475101631 - ok
    time=1619475101629 - conflict
    time=1619475101633 - ok

    The logging below shows these events:

    ServerConnection on port 5142 Thread 3: LocalRegion.basicBridgePut success=true; event=EntryEventImpl[op=UPDATE;region=/__PR/_B__data_75;key=75;callbacksInvoked;version={v2274; rv2274; ds=2; time=1619475101629};id=EventID[id=25 bytes;threadID=0x1004b|1;sequenceID=222440;bucketId=75];tailKey=257037]; isConcurrencyConflict=false
    ServerConnection on port 5142 Thread 3: LocalRegion.basicBridgePut success=true; event=EntryEventImpl[op=UPDATE;region=/__PR/_B__data_75;key=75;callbacksInvoked;version={v2275; rv2275; ds=2; time=1619475101631};id=EventID[id=25 bytes;threadID=0x1004b|1;sequenceID=222501;bucketId=75];tailKey=257150]; isConcurrencyConflict=false
    ServerConnection on port 5142 Thread 3: AbstractRegionEntry.processGatewayTag about to throw CCME event=EntryEventImpl[op=UPDATE;region=/__PR/_B__data_75;key=75;version={ds=2; time=1619475101629};id=EventID[id=23 bytes;threadID=0x1004b|16;sequenceID=18;bucketId=75];tailKey=257376]; stampTime=1619475101631; tagTime=1619475101629; stampDsid=2; tagDsid=2
    ServerConnection on port 5142 Thread 3: LocalRegion.basicBridgePut success=false; event=EntryEventImpl[op=UPDATE;region=/__PR/_B__data_75;key=75;version={ds=2; time=1619475101629};id=EventID[id=23 bytes;threadID=0x1004b|16;sequenceID=18;bucketId=75];isInConflict;tailKey=257376]; isConcurrencyConflict=true
    ServerConnection on port 5142 Thread 3: LocalRegion.basicBridgePut success=true; event=EntryEventImpl[op=UPDATE;region=/__PR/_B__data_75;key=75;callbacksInvoked;version={v2276; rv2276; ds=2; time=1619475101633};id=EventID[id=25 bytes;threadID=0x1004b|1;sequenceID=222562;bucketId=75];tailKey=257489]; isConcurrencyConflict=false
    1. The situation presented in the first example is to be expected with high probability when traffic is being received in the site where the function is to be run. Nevertheless, I do not think any additional measure is required if what we are after is the consistency of the region data on both WAN sites. Even if there are events duplicated, the resulting data in the region hosted in the remote site will be the same as in the region in the source site.

      Regarding the situation in the second example it would be much less probable. Anyway, I do not think that any additional measure is required because the conflicting event will not be applied in the remote site and the region data will be consistent between the two WAN sites.

      Nevertheless, if there are other things to take into consideration that I may have missed, please, let me know.

  3. In my ideal world we use our state flush and delta get-initial-image algorithms to transmit the optimal amount of data to the remote system.  That's a really hard problem though.

    A few brief thoughts:

    1) You may want to consider a different op type for these events.  Many users look at incoming events so ordering and correctness is important.

    2) Snapshot imports can disallow callback notifications.  That is likely needed here as well.

    3) Perhaps the `replicate-region` command could be better named, something like "synchronize data --region=XXX --gateway-sender=NNN".


    In terms of data consistency, the proposed approach seems on par with snapshot export/import.  As a built-in operation, it's easier to use but is still a pretty blunt tool.  If not enough memory is available the enqueued events could cause a server to hit eviction or critical thresholds (in some cases blocking all writes).  That's a significant disadvantage compared to the "touch" approach.

    1. Thanks for your comments.

      I think it is valuable to consider to use a different op type for replication events as well as to see if we should disallow callback notifications in this case.

      I have proposed 'replicate-region' as the name for the command given that the goal I had in mind was to replicate the data of a region with data into a remote region without any data. The name 'synchronize data' suggests a more advanced action in which the local and remote regions already have data and synchronize with each other.

      Regarding the concerns about the situations the execution of the command could provoke (running out of memory, eviction, critical thresholds reaching, blocking of writes,...) they are very valid. In order to avoid the probability of incurring them several measures could be proposed:

      • State in the documentation that the command should be run at low traffic hours.
      • State in the documentation that the command should be run to replicate data over a region in a site that is empty and is not receiving traffic.
      • Have the possibility to stop the execution of the command (as it is suggested in the RFC) in case it is observed that a problematic situation is close to be reached.
      • Limit the rate at which replication is done by means of:
        • A maximum replication rate (in a parameter to the command as specified in the RFC).
        • A more advanced mechanism that could take into account the size of the replication queues or the rate at which they grow in order to limit the rate.

      I do not see why this approach would be a significant disadvantage compared to the "touch" approach. In terms of the problematic situations it could provoke, I think they are on par although the RFC's approach would be less resource intensive as it would save the "region put" part of the "touch" going directly to put the event in the replication queue.

      1. I like "copy region" or something like that as a name. I agree synchronize might sound like it will delete entries in the remote site that are missing in the sending site or something like that.

        I think our commands to follow a convention of spaces after the initial verb. Eg "backup disk-store" instead of "backup-disk-store", so even if we want to stick with replicate it might want to be "replicate region". But I think I mentioned earlier "replicate region" might be confused with a region of type "REPLICATE"?

        1. I will take it into account. Thanks

  4. Alberto GomezI think this topic needs more deeper discussion.

    I think that the feature is nice but there are WAY more issues we have to contend with and there seems to be very little in terms of viable building blocks that we can use within the system.

    Anthony Baker has already indicated that the approach is very blunt. Whilst it makes it "nicer" from a users perspective, but the side-effects could be MASSIVE. Not only can you take down the sending cluster, due to the accessing (touching) of every entry in the region, but also the receiving cluster, as it is not guaranteed that every node in the receiving cluster has a gateway-receiver, which means that we possibly will be overwhelming receivers, which are now tasked with distributing the data across the rest of the nodes.

    And then we have not even spoken about Bi-directional "hot" clusters, where both sites can make changes to the same entries. How does one determine what data the opposing site(s) is missing or even what entries are different. How do we event determine if the sites need to be "synced"? What happens if the sync-sites command is issued? Do we send all data or is there some mechanism that helps us determine that the data is in sync?

    Also, how do we not overwhelm the existing gateway-senders? Is this command done after hours or do we foresee that this command can be used in peak processing times?

    Whilst I LOVE the fact the we want to solve this issue, I think there need to be much deeper discussions held. e.g Are the existing WAN queues suited for this task? Should we possibly be thinking about some indicator (HazelCast for instance uses Merkle trees ) that helps the system to determine what data is missing on both sides? What happens if we find entries that are different on both sides? Who wins? Who determines what the correct answer is, as both are valid and the source of truth.

    I think we should describe this project of what we need to do change to make this feature complete and then we work backwards to determine what is an MVP and what can be added after the MVP has gone live.

    1. Thanks for your comments.

      As I commented in my response to Anthony Baker the scope of the command I have proposed is probably less ambitious than what you are thinking of.

      The way it is proposed could solve an important limitation Geode has now for the use case presented but it should only be used under the conditions stated in the documentation and there should be mechanisms available to monitor the system and to stop the execution of the command in case a problematic situation is close to be reached.

      Nevertheless, I am very open to discuss all the alternatives. I have proposed to discuss this topic in the next monthly synchronous community meeting on Wednesday, May 5th.

      1. One other thing to consider is replicating tombstone events.  Tombstones are used to indicate that an entry has been deleted.  So when you iterate through the region keyset be sure that the collection does include the tombstones (these aren't included in the standard API's).

  5. Regarding throttling, the Geode mantra is "predictable low latency".  That means we will need to avoid overburdening cpu/memory with background work.  For example, if your cluster is overloaded and you want to scale up to add capacity, you don't want the data replication to the new servers to cause an even worse response time.

    For both GII and snapshots, we use flow control based on semaphores to throttle these activities.  Perhaps looking at geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/FlowController.java will help.



  6. After our discussion this morning I have a suggestion for a slightly different way to implement this replicate-region command. I ran it by Barry and he also seemed to like the basic idea.

    The idea is to have a function that directly sends gateway sender batches to the remote site, rather than putting events in the gateway sender queue.

    By not using the queue we can avoid worrying about whether keeping the primary and secondary state of the queue in sync. We can also avoid filling up the queues with too much data and potentially running out of memory.

    Here's what it would look like in a little more detail.

    1. replicate-region will execute a function on all of the primaries
    2. On each primary, the function will create a new connection to the gateway receiver. Maybe it gets a connection from the existing gateway-senders pool or maybe it creates it's own pool
    3. As it iterates over the region it will build up batches of GatewaySenderEventImpls using the existing timestamp and version information. It will call SenderProxy.dispatchBatch_NewWAN to send the batches
    4. Periodically it will call receiveAckFromReceiver to read an ack. It could simply fail if a batch fails, or keep a window of unacknowledged events to retry.

    What do you think? This is still relying on the WAN conflict checks to make sure that if there are concurrent operations we keep the latest update to a region entry.

    1. We will need to define what it does if a member with primaries goes down while the copy is in progress. Will it automatically find the new primary and copy it? Or will it fail saying not all the data was copied.

      We also need to define the behavior on a persistent partitioned region which has one or more buckets offline.

      1. I would expect that if the member goes down with primaries while the copy is in progress, the command will fail, preferably reporting how many entries were replicated.

        If the partitioned region has buckets offline the command should also fail.

    2. Barry Oglesby and Dan SmithThanks for the new idea. Anyway, I am not sure if it is better to bypass the queues and send directly the data to be replicated in batches or use the "official" channel with the queues. I see pros and cons with it. On the one hand, we waste less resources, we release the queues from the burden of the replicated entries, etc. But we lose the feedback we could get from, for example, monitoring the queues.

      I have created a draft pull request with the original idea and a commit on top of it with an implementation of your new idea that sends gateway sender batches directly without going through the queues.

      https://github.com/apache/geode/pull/6441

      It is still a prototype, it needs code cleaning, more tests, probably fixes but we can use it to communicate on the ideas presented and the feedback given.


  7. BTW, I do think having a gfsh command or tool to copy data to a remote side is a good idea, and I appreciate that you are trying make sure the data on the remote side ends up consistent even if there are concurrent transactions, etc.

    I like the suggestion this morning that this tool shouldn't trigger events on the remote side, or cause the remote side to propagate the events any further to other WAN sites.

    1. It does seem to me that not triggering events on the remote side will simplify things but does it also mean that certain feature can't be used on the remote side until the copy is complete? For example if the remote side has clients using register interest or CQs or if they are using AEQ then none of them will be notified of the data that was copied.

      1. In order to keep things simple, I would go for the option of not allowing clients to use the remote site until the copy is complete. I will update the RFC to delimit a bit more what is to be tackled by the command.