To be Reviewed By: 1 March 2022

Authors: Mario Ivanac

Status: Draft | Discussion | Active | Dropped | Superseded

Superseded by: N/A

Related: N/A

Problem

For example, we have configured replicated region with 3 or more servers, and client is configured with read timeout set to value same or smaller than member timeout.

In case while client is putting data in region,  one of replicated servers is shutdown, it is observed that we will have data inconsistency.

As a result, you will see that  part of data is written in server connected with client, but in remaining replicated servers it is missing.


After analysis, it is observed, as client puts data in connected server, that server will replicate data to other servers. If any server in chain of replicated servers is restarted, replication will be halted until server is declared dead.

Since clients read-timeout is lower then member-timeout, client will continue with insertion of data. And again, replication will be halted to replicated servers. At some moment, restarted member will be declared dead, and now we will have halted events  to put in replicated servers, and in parallel new events from client. If new event from client is replicated to other servers, before halted events, then in moment that halted events are notified to replicated servers, they will assume that these are duplicate events (compare sequence Id of received event  to last stored sequence ID), and will not store this data.


In linked PR you can find distributed test for reproduction of problem.

Anti-Goals

Solution

When replicating to other servers, current logic would first create connections toward all servers, and then replicate data over those connections. Creation of connections will last, until all connections are created, or destination server/s is/are declared dead.

New proposal is, when replicating data, first try to create connections to all destinations in one attempt. For all created connections we will replicate data. After this step is completed, we will try to create remaining connections to all unreachable destinations, until connections are established, or destination is declared dead. If connections are established we will replicate data.

Linked PR: https://github.com/apache/geode/pull/7381


Other solution explored that have been discarded is:

Add logic, for storing of last N (configurable value) key - sequence Id pairs, instead of only last sequence Id.

User will configure size of last stored "key - sequence Id" Map.

Default value is 0, feature disabled

Value greater than 0, feature activated (calculate according to formula =  3 x member-timeout / read-timeout)

PR with proposed solution: https://github.com/apache/geode/pull/7214


Changes and Additions to Public Interfaces

None

Performance Impact

None

Backwards Compatibility and Upgrade Path

No upgrade or backwards compatibility issues.

Prior Art

What would be the alternatives to the proposed solution? What would happen if we don’t solve the problem? Why should this proposal be preferred?

FAQ

Answers to questions you’ve commonly been asked after requesting comments for this proposal.

Errata

What are minor adjustments that had to be made to the proposal since it was approved?

  • No labels

36 Comments

  1. It is not clear to me in your description what the cause of the inconsistency is. Could you add a detailed list of order of operations and events that cause the inconsistency? What role does the client timeout play? Does it cause the client not to retry the operation? The client sends the operation to one server and then that server is responsible for sending to all the other servers. Is that server initial server that received the op from the client the one that needs to go down? Also does it need to go down after it has sent the op to at least one other server but before it sends it to all of them?

    You also talked about a server being restarted. Did it have the data persisted in it? It is not clear to me in your description why the restarted server would ignore data stored in the online servers. One of the things the server's do when they see another server hosting their region depart the member view is send out a reconciliation message to the survivors to see if the server that died sent anything to them that it was not able to send to everyone. This should prevent data inconsistency on what is stored on the servers.

    One possible performance impact of your solution is that it adds a LinkedHashMap to a class that can have many instances. It looks like the current solution does this even when the feature is disabled. If we end up 

  2. Mario Ivanac I am assuming once the proposal is ready to review; you are going to send an email to geode dev list, right? just checking. Please ignore if you have already sent an email to dev list.

  3. As Darrel mentioned; geode has mechanism to reconcile data in case of member depart. 

    In your case, are there any changes to default behavior...E.g. turning off concurrency checks. 

    It will be nice to have a geode ticket created with reproducible test attached to it; as you able to demonstrate the issue.


  4. Hi,

    ticket is already reported Unable to render Jira issues macro, execution error. .

    Also, test is provided in attached PR.


    BR,

    Mario

  5. After reading through the JIRA and RFC, I'm not clear on the exact scenario that causes the data inconsistency.  If you could clarify (as suggested by Darrel):

    1. The sequence of things that happen
    2. Where the implementation gap is that allows the inconsistency
    3. Why the proposed solution fills that gap
    4. Any downsides to the proposed solution (eg memory consumption, additional gc load)

    Thanks,

    Anthony

  6. What was the client retries value?  Did the client thread doing the update receive an exception? If so, what happened after the client application received the exception? 

    I'm not sure what "client will continue with insertion of data" means...what should happen is that the client logic will retry the operation for N times.  If the update is still failing then an exception will be thrown to the application thread. If the application continues with further updates (thus increasing the sequence number on the thread) it is definitely possible for a distribution operation to be rejected.  However, that seems like the correct/desired behavior...?  Perhaps I'm misunderstanding the scenario.

    It's important for each client thread to maintain causal ordering (ie each action on a given thread is strictly ordered). 

  7. If we have client implementation, that ignores exception (like in reproduction test), and continues with putting of data, then server connected to client will have all data, and other servers will have inconsistency.

    That is the reason we propose this improvement, that data consistency should not depend on client implementation, or configuration of read-timeout and member-timeout.

  8. When a client does not receive a positive acknowledgement of an update, there are at least 3 states to consider:

    1. The cluster never processed the update message at all (failure during send)
    2. The server connected to the client received the update but failed to fully replicate the update before the client gave up
    3. The update was fully replicated into the cluster but the client never received the acknowledgement

    I think the client has to be aware of these potential failure conditions. Deciding how to respond is application-specific IMO.

  9. Here is the current reconcile mechanism:

    • Assume there are 3 servers (server1, server2, server3) with replicated region r1
    • Client connects to server1, sends the put request
    • Put happens on server1, it replicates the data to server2 but fails to replicate on server3 (as server1 gets killed)
    • Now server2 and server3 sees server1 departed. They both initiate data reconciliation between them (doing delta GII) - This causes the data to be consistent between server2 and server3 (note this does not generate any callback events)
    • On the other hand; as client does not get response from server1 for the put request, it retries the operation on other server (think about the other case, where the  replication did not happen on both server2 and server3); if the data is present its ignored/elided, if not its applied; in both the case an event is generated and sent to the interested clients and wan gateway.

    Where in this process are you seeing the issue. Are you giving sufficient time to see the data is reconciled and events are propagated?

    If client is never able to put on any server; it will throw NoServerAvailable (i guess) exception to the application.


    • Assume there are 3 servers (server1, server2, server3) with replicated region r1
    • Client connects to server1, sends the put request
    • Put happens on server1, it tries to replicate data to server2 and server3 (as server2 gets killed)
    • As server2 is down, server1 will wait until server2 sends response, or is declared dead (member-timeout), and will not replicate data to server3
    1. If server2 and server3 both host region r1 then when the put is done on server1 it will send the replicate data to both server2 and server3 before waiting for any replies. You are talking about a REPLICATE region correct? Or is is a PARTITIONED region with redundancy?

  10. Yes I am talking about REPLICATE region, and server1 tries to send replicates data to both server2 and server3, but internal implementation in MsgStreamer performs sequential operations per connection.

    1. Can you share details on the part of MsgStreamer that does this?

  11. If I remember correct; the replication should be parallel across all the replicated node; not sequential...

    I do see whats happening...Down in the msg-stream its still sequential...

    But if there is a failure writing to one stream; it will catch the exception and continue to write to other.

    There are two implementation of BaseMsgStreamer.writeMesssage()

    1. MsgStreamer 
    2. MsgStreamerList

    Take a look at the MsgStreamerList.writeMessage() implementation. In case replication to multiple nodes, it should go through MsgStreamerList implementation.


  12. After deeper analysis, actually everything is halted prior to MsgStreamer. When message is sent to multiple destinations, first step is to create connection toward all destinations. In case any destination is not available (server down), we will retry with creation of connection, or wait notification that destination is removed. For details you can see DirectChannel.sendToMany().


  13. Mario Ivanac 

    Are you saying, that when server1 tries to connect to a server2(say) that is down; it just waits and no response is sent back? and never replicates/sends the data to server3? And so replication is incomplete...

    I feel this is a bug, that needs to be addressed. It should try to send it to other members that are still available.


  14. But once we get notified that server2 has departed (member timeout plays a role here) then we will proceed and replicate to server3. A period of time does exist in which server1 has already updated its cache but has not yet distributed that update to the other servers. If during the wait for server2's departure, the client times out then the operation is still in progress on the servers. You could ask server1 for its state and it would have the put but server3 would not. But eventually, once the op has completed, both server1 and server3 will have the update. Is this all that is going on here or have you found that server3 will never receive the put?

  15. According to implementation, generally when we are sending message, first we create all connections then send message. If in first step any destination is not responding, and not yet declared dead, we will wait.

  16. Yes we will wait. But why is that a problem? In your original description you said we would never send it to server3. But I think after we have waited for server2 and get notified of its departure we will then send to server3. Is this a problem?

  17. As I said that we will eventually send this event to server3.

    But, since client is continuing with insertion of data, at some moment (after server2 is declared dead) new data will be replicated to server3 (and stored), before retry is triggered for previous hanging events.

    Now when old hanging events are resend, server3 will check is this event (by comparing sequenceID with last stored sequenceID).

    If received sequence ID is not greater then stored ID, we declare event duplicate, and discard it.(This is a problem)

  18. You say "when old hanging events are resend" but it sounds like it is not a "resend" but just that it is eventually sent. Correct? It was delayed because it was waiting for server2 to leave or be connected. If a newer value for the key sneaks in during this window don't we want it to "win"? It would be stored on both server1 and server3 and then the original put is finally distributed to server3. Don't we want that put not to happen since it is older? You say we discard it and that is a problem but it seems like if we applied it that would be the problem because the newer value has been stored on server1 and server3 and if we then change just server3 to the older value our cache would be inconsistent.

    Do you have concurrency checks enabled on this replicate region? If you do that is supposed to give us eventual consistency. When you say "we declare event duplicate" I think that is a different part of the code than the concurrency checks. Do you know what part of the code you are seeing declare the event duplicate? The concurrency checks happen down in AbstractRegionEntry.checkForConflict.

    FYI: This issue would not occur on a partitioned region because the in progress op on the server would keep the entry locked on the primary. If the client times out the put and tries another on the same key it will actually be stuck behind the old one that is still in progress. But with a replicate region the lock is released before doing distribution to prevent a distributed dead lock since concurrent puts can originate from multiple servers for the same key.

  19. Yes this is eventually sending of value on server3. And we get new key-value pair, not new value for the same key. That is the reason I think that this logic should be improved.

    Part of code that check this is:

    DistributedRegion.hasSeenEvent() which calls 

    DistributedEventTracker.hasSeenEvent()




  20. I think I am beginning to understand. It seems like the problem is that the client times out an operation (that continues to be executed on the server) and we have logic that subsequent ops done by that client (thread?) will happen after the op that was timed out. I wonder if the client when it times out an op should get a brand new identity for subsequent ops it does. I think when it times out an op it has to give up that identity and establish a new one with the server; as if the ops after the timed out one came from a new client thread. And we can't guarantee that ops done after one that timed out will do done on the server after the timed out op is done. We could do something on the server side (like what happens on a partitioned region) that causes an op that comes in from that client to wait for the previous op to complete on the server. But then I'm not sure what the purpose of the client timeout is. 

  21. The other thing I think we have discovered is a performance issue on the server side. If it has to send a message out to others and one of them is going down, then it may wait for it to be down (which can take a while due to member-timeout) before sending the message to the live members. Instead it should immediately send the message to the connected members and then wait for the other members to connect or leave the view. If we did this I think we would have ended up sending it to server3 before the client timeout out and would not have had an issue.

    Also it seems like if the client retried the operation then the servers would have been consistent.


    1. Darrel Schneider I think what you are saying here: " Instead it should immediately send the message to the connected members and then wait for the other members to connect or leave the view. If we did this I think we would have ended up sending it to server3 before the client timeout out and would not have had an issue." would be the way to fix this issue. Am I right? Is it possible?

      1. Yes, Alberto, you correctly understand my comment. If this change to how messages were sent is made then I don't think this issue would have been seen. It would still be possible in theory if a client had a very small timeout value. I think this improvement could be done but it would not be trivial.


        1. Thanks for the confirmation.

  22. From the comments I assume, that main reason for data inconsistency is faulty implementation in geode client (handling of readTimeout exception).

    Due to that, no changes should be made in geode servers implementation, and we should relly on correct implementation in geode clients.

  23. Hi, I just wanted to clarify, should we continue with this RFC, or should I close it, since main reason of this problem is faulty client implementation (but everyone is not aware of it).

    1. I think your current solution should be closed. I agree that we don't want to have the extra LinkedHashMap on the server side. But a server side change we could make is that if a member is in the view but we don't have a connection to it, we will go ahead and distribute our message to the other connected members and then wait for connections or the member to leave.

      It would also be good to have discussions with the community on what clients should do when they timeout an operation.

  24. Proposed solution is closed. Regarding current implementation of sending message to multiple destinations,  first we create all connections, and then we continue with sending of message. I am not sure is this intended.

  25. Mario Ivanac 

    The RFC is about addressing the inconsistent replicate behavior when the client timeout is less that the member time out. The RFC with respect to problem statement is still correct.

    The initial proposed solution and other alternate solution are the options discussed as part of the problem statement.

    We can state that for the stated RFC (problem) we are going forward with the new proposed solution and continue with the RFC.

    Are you planning to continue to work on this?

  26. I will continue with new proposal. I have concerns, regarding new proposal, since impacted procedure is generically used when sending messages, so this will impact general geode behavior (not just reported problem).

  27. Since there are no new comments, I will continue with current proposal.