To be Reviewed By: April 1st, 2020

Authors: Alberto Gomez (alberto.gomez@est.tech)

Status: Draft | Discussion | Active | Dropped | Superseded

Superseded by: N/A

Related: N/A

Problem

Gateway senders deliver events to gateway receivers over batches. Batches are filled with events retrieved from queues and then sent whenever the maximum configured size for the batch is reached or when it is due time according to configuration. With this mechanism, events belonging to the same transaction may be sent over different batches.

If there is a network split in a WAN deployment when the last batch received from a sender does not contain all the events for one or more transactions, the data in the receiving side will be inconsistent while the network split remains. Inconsistent data due to applying an incomplete number of the operations for a transaction could be very problematic for clients.

Anti-Goals

This proposal does not intend to replicate transactions using gateway senders and receivers. It just aims at making sure that transaction events are delivered atomically by gateway senders to gateway receivers.

It is also assumed that the events to be delivered atomically must be so by means of the same gateway sender. Therefore, the deployments that require this feature should configure replication such that the regions to which the data in the transactions belong must be collocated and must have the same gateway senders. Additionally, if serial gateway senders are used, they must be configured with just one dispatcher thread given that it would be impossible to fulfill the requirement if each event of a transaction is handled by a different dispatcher thread.

Solution

In order for the gateway senders to deliver batches that contain all the events for the transactions the following is proposed:

  • After transaction commit and before events are sent to the gateway sender, mark the last event of the transaction (a new boolean attribute “lastEventInTransaction” may be added to GatewaySenderEventImpl)
  • In the process of creating a batch, the gateway sender must keep track of all the transactions involved in the events it is putting in the batch.
  • Once a batch is due to be sent (due to maximum configured size is reached or time), the gateway sender must check:
    • If  for every transaction to which events on the batch belong, the last event has been received, then the batch may be sent.
    • Otherwise, two options are proposed:
      • a) Keep reading events from the queue and put them in the batch until the above condition is fulfilled. Then send the batch. This option has the problem that the size of batches could grow a lot beyond the maximum size in situations of big transactions and high number of them.
      • b) Go to the queue and get selectively the events for the transactions in the batch that do not have all the events. Once these events are added to the batch, send it. This option is preferred over option a) because it should avoid the problem of batches much bigger than the maximum size configured.

Given that this behavior requires a given configuration of replication as mentioned above, it is proposed to make it configurable by means of a new parameter to the "create gateway sender" command (--group-transaction-events) and that is not activated by default.


Other solutions explored that have been discarded are:

  • Send a new type of event inside batches -event transactions events, that will contain all the events of a given transaction. This solution, does not seem to fit well with the current code base as events are triggered on a region basis while transactions could contain events from several regions. The changes in the code required seemed to be much greater than the ones with the preferred solution.
  • Make sure in the gateway receiver that events for a transaction are only applied if all the batches where they have been delivered, have been received. This solution seemed to be more complex than the one preferred and did not provide extra benefits.

Changes and Additions to Public Interfaces

A new method to GatewaySenderFactory, setGroupTransactionEvents() to create gateway senders with support for this new feature must be added.

Performance Impact

The performance of gateway senders may be impacted as there will be an extra cost of keeping track of transactions in batches and also of retrieving selectively events from the queue (if this option is selected).

Backwards Compatibility and Upgrade Path

The gateway senders will behave as before if the new parameter --group-transaction-events is not used.

Versioned support for GatewaySenderEventImpl must be added due to the new field added.

Prior Art

As said above, with the current behavior of gateway senders, on the event of network splits, clients will have to deal with data inconsistencies provoked by events not delivered atomically for the time the network split is in place.

FAQ

Answers to questions you’ve commonly been asked after requesting comments for this proposal.

Errata

What are minor adjustments that had to be made to the proposal since it was approved?

  • No labels

26 Comments

  1. For solution B, I would just be concerned if we somehow unintentionally reorder the events in the queue and transmit them out of order.  For example if we picked all of transaction B's events, those get sent out but let's say one of those events "leap frogged" another event for the same key.

    1. I had the same question. I'm not quite sure I'm understanding how option B will work, but I think we do need to maintain key ordering of events.

      It may not be that hard to do - I'm pretty geode should already ensure that if transactions X and Y both touch the same key and X happens first, then all of Xs events should be in the queue first. So it may be enough to just ensure that transactions go out in the order that their first event was originally enqueued.

      1. I'm just shooting in the dark here and probably am wrong, but is it possible to have non transactional operations interweave with transactional events?  I'm easily convinced that between transactions, that we are safe between this interleaving (smile)

        1. Thanks for your comments!

          If we get events reordered for the same key when retrieving an event from the queue for an incomplete transaction because another event for the same entry gets "leaped frogged", the result in the receiver would be similar to having conflation. The event that got "leap frogged" would arrive later to the receiver and would be discarded as the timestamp is checked against the latest event on the entry.

          The only problem I see is that the receiving side might not see all the changes that have happened in the sending side because of this reordering – i.e. we conflated the events.

          Still not sure if it is possible that events for the same key could be reordered. And if so, that must have been because events changing the same entry (either both in TX, or one in TX and the other outside) were very, very close.

          Case 1 – 2 TXs changing (among other things) same entry

          First one that commits will change the entry. The second TX will fail if it updated the common entry before TX1 did it (i.e. it obtained a private view on the entry before the TX successfully committed). Corollary of this is that TX1 will start putting its events in the queue before TX2. The question is if this guarantees that the common entry changed by TX1 will be in the queue before common entry changed by TX2. For example if TX1 had many events and TX2 just one – changing the same entry as the last one in TX1. TX1 commits and starts queueing. TX2 commits and also starts queueing. Now – If Geode currently does not guarantee that TX1 common event will be sent before TX2 event – then there is already some possibility of reordering in the current code base the and I do not see what we could break if we pick TX1 event before the TX2 event. Actually, in this particular case the reordering would be for the good.


          Case 2 – 1 TX and one “ordinary” change – e.g. normal put

          Here it is only the TX that can fail to commit if it obtained a private view on an entry before the put operation finished but did not commit. Put is never jeopardized. Cases:

          Put – then TX does put – then TX commits
          TX does put – then normal put – then TX fails to commit
          TX does put – then TX commits – then normal put
          In cases 1 and 3 – similarly like above – I would question if Geode currently makes sure that changes are in order. In particular for case 3 – when a huge TX commits and starts queuing its events, could we have the change done by the normal put to be interwoven with the TX events? If yes, then again I do not see what we could break putting the events of the transaction before the put as Geode is not ordering correctly anyway. If not  then definitely we would break anything.


          If your concern is about the reordering of events that *do not* touch the same entries, I do not think that is a problem – their ordering in the queue is (almost) random when they are very close in time anyway so I do not think I would matter on the receiving side.


          Please, let me know if I am missing something here.

    2. Similar concern. Could we not just process the queue up to but not including the first incomplete transaction. If a completed transaction is interwoven with an incomplete transaction then that transaction is considered incomplete? 

      Consider the following examples where * denotes the last event in the transaction.


      A1, B1, A2, B2*, A3*, C1
                          ^

      Process queue to and including A3 because A and B are complete and A is interwoven with B. C is not complete in this queue.


      A1, B1, A2, B2*, C1, A3*

      None of this queue would get processed because C is not complete and interwoven with A. Since A can't be sent neither can B since it is interwoven with A.


  2. > Therefore, the deployments that require this feature should configure replication such that the regions to which the data in the transactions belong must be collocated and must have the same gateway sender.

    Just wanted to clarify - if a user misconfigures the system and tries to commits a transaction that spans multiple gateway senders with this --group-transaction-events option, what will happen? Will they get a meaningful error message back? I don't think it would be a good outcome if the a misconfiguration resulted in the gateways no longer dispatching anything.

    1. What I can think of, right now, is that the events could be delivered anyway although possibly not grouped in the same batch and there could warning log reporting that a possible misconfiguration is provoking that events may not be grouped.

      The way to identify this could be that the last event for a transaction is not found in the queue after having tried a given number of times.

      1. Seems like we could validate that the transaction events all end up in the same queue though - at commit time we definitely know all of the regions involved in the transaction and we could check that they have the same sender(s).

        1. This sounds expensive if we do this one every transaction.... but mind you... the whole large, multi-region, multi-object transaction will take up some time anyway.... There must be a smarter way to do this... Maybe we limit to only one GWS with transaction grouping enabled and at config time we validate this.

        2. You are right. We could check in the transaction if all events go to the same sender and if so mark the last event as "last transaction event".

          In case the transaction had events that when to senders configured with "group-transaction-events" and other without it we could mark all the events as "last transaction event" which would amount to the current behavior, events are put in the batch without any consideration about to which transaction they belong to.

          But we also need to take into consideration Udo's comment, how expensive this check would be for every transaction.

  3. Regarding backwards compatibility, I think you will need to do some work to ensure peer to peer backwards compatibility. EntryEventImpls and GatewaySenderEventImpls are sent over the wire, and GatewaySenderEventImpls are also written to disk for persistent queues. So those classes will need to be versioned to support reading from old members/persistent disk. It's not too hard to do - see Managing Backward Compatibility. GatewaySenderEventImpl has already been versioned in the past you should should be able to follow the pattern in that class. There are already some WAN rolling upgrade tests, see the tests in geode-wan/upgradeTest.

    You are not planning on changing the format of data that is sent to the gateway receiver, is that right? In other words, an old gateway receiver will have no trouble reading the data that is sent?

    1. This was the first red flag for me reading this proposal. I was wondering if we could someone track the final events out of band with the events.

      (spitballing below)

      Rather than adding a boolean, likely padded out out in memory too, to each event could we not keep a queue of completed transactions by their last event. Then as you are scanning the batch queue for WAN you could pull the first entry off the completed TX queue and walk until you find it in the batch queue. If along the way you didn't encounter any other TXs this TX is ready to send. If you encounter a different TX you scan the TX queue for that TX and push all the TXs onto a stack. Those are now the TXs you need to complete before the batch could be sent without having incomplete interwoven TXs. You continue this until you have popped all the TXs off the incomplete/interwoven stack. Once the stack is empty the position in the queue is where you can process up to.

      1. There is a single caveat here... Which we haven't addressed and cannot check or enforce. That is that a single GS has to be used for all Regions that are involved in the Transaction. This can only be enforced manually.

        In addition to this, are you suggesting that all entries related to the same transaction as sent as a single batch? In addition to that, how do we handle partially processed batches on the GR side? There we also have to apply the entries in a transaction. Otherwise I fear that this solution will suffer the same problem that is current with a split brain. Except that it now manifests in the unlikely event that the GR node(s) crash whilst processing the batch containing the transaction entries. Or am I missing something?

        1. Thanks Udo.

          You are right, if the new parameter --group-transaction-events is used to create a gateway sender, a single sender has to be used for all regions involved in the transaction and that can only be enforced manually.
          But as commented above, if there is a misconfiguration, the events will still be delivered and there could be an error reporting the possible misconfiguration.

          The idea is to never send a batch with incomplete transactions not sending all entries related to the same transaction as a singe batch.

          This RFC does not address the problem of partially processed batches on the GR side. Experiencing split brain situations in a WAN distributed system is something you would expect to see often and the system should be prepared to face it. That's the reason for this RFC.

          Partially applied batches could be another problem that could arise but it is much less likely to appear.



          1. I'm now not 100% certain if all entries/events that pertain to the same transaction will be sent in the same batch? That said, if the batch size is set to 100, and our transaction touched MANY more entries (let's say 1000). Will we now have 1 batch with 1000 entries, or do we still expect 10x 100?

            Do we believe that Replicate vs Partition regions to behave differently?


      2. So if I understand correctly, in order not to add a new boolean field to the event, the idea would be to keep the last event of each transaction in a parallel structure so that when the batch is created we can evaluate if the batch contains all the events for each transaction?

        1. word of warning, we already have many data structures in wan that have in the past lead to leaks, inconsistencies, lingering events, etc.  Another data structure built in, that has to be 'in sync', should be done with care and as you suspect, might be even more difficult to get correct.

          1. I agree, Jason. It would really complicate the implementation.

            What if the boolean field is only added to the GatewaySenderEvent? I think there is a way to pass the information to this class without using the EntryEvent. The transactionId would also need to be stored in the GatewaySenderEvent as Barry pointed out.

    2. Good point, Dan. I will update the RFC with the backward compatibility changes required by this proposal.

      I do not see it necessary to change the data format of the data sent to the gateway receiver.

  4. => Therefore, the deployments that require this feature should configure replication such that the regions to which the data in the transactions belong must be collocated and must have the same gateway sender.

    I think this requirement will be impossible if there is a mix of replicated and partitioned events in the transaction unless the application is using serial senders. In that case it will work, but if parallel senders are being used for the partitioned regions and serial senders are being used for the replicated regions, then the partitioned events will be sent to the same colocated sender, but the replicated events will be sent to a different serial sender.

    => After transaction commit and before events are sent to the gateway sender, mark last event of the transaction (a new boolean attribute “lastEventInTransaction” may be added to EntryEventImpl and GatewaySenderEventImpl)

    If an application is using transactions, then most of the events in the batch will probably be part of a transaction. If that is the case, there will probably be a lot of events in the batch that are part of different transactions that are not the last event in their transaction. I think you'll probably have to keep track of the transaction id in the GatewaySenderEventImpl along with the boolean. With just the boolean, you won't know which event belongs to which transaction.

    =>b) Go to the queue and get selectively the events for the transactions in the batch that do not have all the events.

    Because most of the events in the batch will probably be part of a transaction, you're going to have to use option b in your solution otherwise the batch will never end.

    One thing to keep in mind is that all the events for a single transaction will be in the same BucketRegionQueue, but all the events in a single batch will not necessarily be from the same BucketRegionQueue. For each event in the batch that is not the last event in the transaction, you'll have to get the correct BucketRegionQueue for that event and iterate through it to find the remaining events.

    => No changes to public interfaces are proposed.

    Does this mean there won't be a public java API for this like:

    GatewaySender sender = cache.createGatewaySenderFactory()
    .setParallel(true)
    .setGroupTransactionEvents(true)
    ...
    .create(id, 1);

    => The performance of gateway senders may be impacted

    I think the performance will definitely be impacted by this option. If its enabled, there will have to be a post batch creation step to find incomplete transactions and go back to each BucketRegionQueue to find the missing events.

    1. Thanks a lot for your comments, Barry.


      => I think this requirement will be impossible if there is a mix of replicated and partitioned events in the transaction unless the application is using serial senders.

      You are right. The idea, which should have been reflected in the RFC is to support it for the partitioned events when parallel gateway senders are used.

      => I think you'll probably have to keep track of the transaction id in the GatewaySenderEventImpl along with the boolean. 

      Yes, that's right.

      => Because most of the events in the batch will probably be part of a transaction, you're going to have to use option b in your solution otherwise the batch will never end.

      Yes, that's a possibility and that's the reason for preferring option b)

      => you'll have to get the correct BucketRegionQueue for that event and iterate through it to find the remaining events.

      Good point. I had not thought about it.

    2. I forgot to answer to this comment:

      => No changes to public interfaces are proposed.

      You are right. The setGroupTransactionEvents(boolean isGroupTransactionEvents) method must be added.

  5. I must admit that I don't like the proposed solution based upon the following:

    • Transactions can span many (but we recommend a single object) objects. If EVERY EntryEvent now contains a new boolean, then we have increased our over the wire format for everyone!
    • What happens if different regions , part of the same transaction go through different Gateways? (I don't know if we check for that). In that case we now have objects for the same transaction arriving on the receiver side, without the ability to commit themselves. So we have to hold onto them to (in memory) until some signal is sent to apply the objects... Which might never happen due to split brain.. so possibly holding onto memory for ever.
    • How do we handle bucket allocations that are different between clusters. We cannot rely on the region bucket allocations to be the same between clusters. We need something to oversee that all objects within a transaction are correctly received.
    • How does this new boolean work over multiple receivers? How do we signal that each member that the objects can now be applied?
    • How do we handle transaction replication failures? I.e think inflight conflicts. Site A modifies objects and SiteB modifies objects part of an uncommitted transaction? Do we rely on the replication collision  detection to track that we have objects that are in bi-direction GW replication?

    I would prefer some mechanism (and yes it will be more work) where there is some mechanism that tracks the objects (and versions) that are part of a transaction.

    Transactions aren't simple and what you are effectively proposing is Distributed Transactions over WAN. Which has most likely the same complexity as Distributed Transactions within the same cluster, BUT with the added twist that one has no knowledge of the state of the receiving cluster. I think there needs to be more thought put into this. Hazelcast is using Merkle trees to good effect in this area.. this might be an approach

    1. First of all, as Dan pointed out in a previous e-mail, "this proposal must not be seen as implementing distribution transactions at all. This is just
      trying to group transaction events together in a batch. I don't think we need to solve the whole distribution transaction problem with this proposal."

      Its objective is narrower but we still think addresses an important problem and can be solved with a reasonable amount of effort.

      Regarding the other concerns:

      We can think of another mechanism like the one proposed by Jake in order not to add a new field to every EntryEvent although the solution would be more complex to implement.

      The proposal is restricted to configurations where all the events of transactions are sent by the same gateway senders.

      There are no impacts on the receiving side. All the changes are done on the sending side. The receiving side should behave exactly as today.

      1. Alberto... Could I bother you to please create some pretty pictures that we could reference in our discussions around ordering, not ordering of data entry events in the GW queue and how we will decide what entries are sent and in what order? Given that will will now have multiple regions writing to the same queue...

        I'm struggling to visualize how the proposal is going to address the scenarios where many threads are writing to the same GW queue. At what point to be insert the batch of transactional events to be sent?

        How would this proposal work for Bi-directional GWs? Does it even make sense to support bi-directional GWs with this? What about GW conflicts or collisions? Previously if there was a collision it was usually centered around 1 event. With this possibly the whole transaction now needs to be investigated.

        Sorry, not trying to be difficult here.. Just wondering if we have thought about this or even considered how we address this..

        BUT, that said, WAN replicated Transactional data has not been brought up before, so I imagine this is a very small niche area.

  6. Udo, sorry that the pictures are not pretty. I used text instead with an example to try to be of help in these discussions.

    Please keep in mind that the he proposal just aims at making sure that when a batch with events is to be sent, for those events that belong to transactions, all the events for every transaction are present in the batch. In order words, events for the same transaction are not split into several batches. The proposal should not affect bi-directional gateways and neither how conflicts are treated. That should keep working as it is today.

    Here is an example of how the system will behave with the proposal where we can see some cases of reordering:

    Suppose we have a WAN replicated system with a gw sender and gw receiver in which the batch size is 10.

    I will show the evolution of the contents of the queue where events are being placed and of the batch as it is being filled by the gateway sender.

    The queue and the batch will have events with the following format:

    • (<letter>). Event for entry <letter> not belonging to a transaction
    • (T<number>-<letter>). Event for entry <letter> belonging to transaction <number>
    • (T<number>-<letter>)*. Same as the one above but the asterisk indicates that it is the last event in the transaction.


    1. Initial state:

    Queue: (A) (T0-X) (T0-Y) (T0-Z)* (T1-A) (T1-B) (A) (T2-A) (T1-C) (T2-B) (D) (E) (T2-C) (T2-D)* (T1-D)* ... ← (new events are fed here)
    Batch: <empty>


    2. After the gateway sender has read 10 events from the queue:

    Queue: (D) (E) ((T2-C) (T2-D)* (T1-D)* ... <-

    Batch: (A) (T0-X) (T0-Y) (T0-Z)* (T1-A) (T1-B) (A) (T2-A) (T1-C) (T2-B) <-


    As the batch contains events belonging to transactions (T1, T2) whose last events are not in the batch, it is time to get the missing transactions from the batch in the order the events from each transaction appeared in the batch: First the events from T1 then the ones from T2.


    3. After getting the missing events from T1 and T2 (this is the extra step the proposal describes):

    Queue: (D) (E) ... <-

    Batch: (A) (T0-X) (T0-Y) (T0-Z)* (T1-A) (T1-B) (A) (T2-A) (T1-C) (T2-B) (T1-D)* (T2-C) (T2-D)*

    The batch is now ready to be sent.


    Consequences:

    • The batch does not contain events for an incomplete transaction. If the batch had been sent with 10 elements (as it is currently with Geode) and there was a network split before the next batch is received, the receiving side would have inconsistent data for the time the split lasts (T1 and T2 incompletely applied).
    • The batch is 3 elements greater than the maximum size configured because it had to be completed with the events from incomplete transactions.
    • Some events were reordered:
      - (T2-C) (T2-D)* (T1-D)* were sent before D and E which according to the queue should have been sent before.
      Is this a problem? No
      If we had events from T1 and T2 before D and E and some others from T1 and T2 after, that means that T1, T2, D and E have occurred very close in time. So close that is probably impossible to know which one happened first and Geode probably cannot assure that the order in the queue is the order in which they occurred.
      Most probably D and E were executed after T1 and T2 but as T1 and T2 had several events to take to the queue D and E arrived to the queue before all the events from those transactions. Therefore, a reordering here would not matter and probably putting the events of the transactions first is more accurate to what really happened.

      - (T1-D)* has been put before (T2-C) and (T2-D)* even though it was after them in the queue.
      Given that T1 and T2 have interleaved events in the queue they must have happened very close in time. As the first event in the queue from both transactions is from T1 (T1-A) it is fair to assume that the order of execution was first T1 and then T2. Therefore, it would not be wrong to put all the events of T1 before T2 in the queue. Actually, it would be more accurate and therefore the new mechanism would provoke a better behavior (not very important anyway).
      In case T2 was executed before T1 then the queue did not have a perfectly accurate ordering (because in the queue we have T1-A before T2-A). So, a reordering like the one performed by the new mechanism could not be considered wrong.

      Finally, in case any reordering (or current ordering) caused that a later event for the same entry is put before a prior one, the receiver will handle it correctly by using the timestamp of the events. The timestamp of an incorrectly ordered event will make the event to be discarded at the receiving side and the end result would be equivalent to conflation.