Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Algorithm's purpose is identify wrong order of messages by signing the messages with actual color (red or white). Then every node knows which side of the cut this transaction message belongs to. 

Messages:

  1. sent - a set of messages that changed state of a sender.
  2. received - a set of messages that changed state of receiver.

Algorithm's steps:

  1. Initially all process are white.sent and received collections are empty, LocalState is empty.
  2. After some time of system work, every node might have:
    1. Optionally empty collections sent and received
    2. Optionally non-empty LocalState <-> sent + received. State match events that changed its state.
  3. Random process can start a snapshot (furthermore, multiple process may Random process can start a snapshot (furthermore, multiple process may start it simultaneously):
    1. Node colors itself to red.
    2. It commits a LocalState.
    3. It prepares the sent collection commits sent and received as collections for every OUT channel: it contains sent messages to every outgoing channel j.It prepares the received collection for every IN channel: it contains all received after local snapshot messages from every node jIN and OUT channel. New one created for next LocalState.
    4. It prepares a marker message: it is red, and has a payload of sent. Goal of the marker is to guarantee order of messages (receivedij must be a subset of sentji).
  4. Mark every ordinal message between distributed processes with the marker message, if no upcoming message to a node, then it just sends the marker as an ordinary message.
  5. On receiving the ordinal message a process has to check the marker at first, before applying the message;
  6. If receiving color differs from local color, node has to trigger the local snapshot procedure.
  7. Handle sent from the received marker:
    1. calculates ChannelState for the channel it received a message: sent - received; where sent extracts from the marker, received - calculates locally since local snapshot.
  8. On received marker messages from all IN channels, it prepares a snapshot:
    1. Local snapshot of node i: Ni = LocalStatei + Σ ChannelStateij (sent sent - received)
  9. Every such local snapshot is a unit of global snapshot.

Map the algorithm to Ignite

Create a recovery point (Consistent Cut)

For Ignite implementation it's proposed to use only single node to coordinate algorithm. User starts a command for creating new ConsistentCut on single node and this node becomes responsible for coordinating Consistent Cut iteration:

  1. Every node uses local ConsistentCutVersion to sign transaction messages (see below "Signing messages").
  2. New global ConsistentCut is started by user command.
    1. It initialize a new version - increment of local ConsistentCutVersion 
    2. It sends ConsistentCutStartRequest with new version to baseline nodes in a cluster (piggy backed on DistributedProcess).
  3. Every node can receive new version by two ways: with the ConsistentCutStartRequest or by transaction message signed with new version.
  4. On receiving new version node inits local ConsistentCut:
    1. upgrades local version
    2. prepares ConsistentCut future
    3. collects active transactions and decides which side of ConsistentCut they belong to
    4. writes start mark into WAL (see below).
  5. Node that first commits transaction signes Finish message with local ConsistentCutVersion (txCutVer)
    1. Near (transaction originated node) for 2PC.
    2. Backup (or primary if backups=0) for 1PC.
  6. For collected active transaction nodes check order of events relative to ConsistentCut by comparing the txCutVer with local ConsistentCutVersion:
    1. if local version is greater than txCutVer then transaction belongs to BEFORE side
    2. if local version is equal to txCutVer then transaction belongs to AFTER side
    3. Node, that local version never is less than txCutVer. Because node must process ConsistentCutVersion before applying message with this version.
  7. After every transaction was finished - node writes FinishRecord with transaction info into WAL.
  8. Notifies a node-initiator about finishing local procedure (with DistributedProcess protocol).

Consistent and inconsistent Cuts

Consistent Cut, in terms of Ignite implementation, is such cut that correctly finished on all baseline nodes - ConsistentCutStartRecord and ConsistentCutFinishRecord are written.

"Inconsistent" Cut is such a cut when one or more baseline nodes hasn't wrote ConsistentCutFinishRecord. It's possible in cases:

  1. any errors appeared during processing local Cut.
  2. if a transaction is recovered with transaction recovery protocol (txCutVer is unknown).
  3. if transaction finished in UNKNOWN state.
  4. baseline topology change, Ignite nodes finishes local Cuts running in this moment, making them inconsistent.

ConsistentCutVersion

Every ignite nodes tracks current ConsistentCutVersion:

Code Block
languagejava
titleConsistentCutVersion
class ConsistentCutVersion {
	long version;
}

`version` is a simple counter. It's guaranteed it is raising monotonically, due to it is incremented by discovery communication.

ConsistentCutVersion initialization

ConsistentCutVersion can be initialized with:

For changed server topology: 

  1. On start a server node checks local MetaStorage for pre-stored latest ConsistentCutVersion, default is 0.
  2. This version is packed with JoiningNodeDescoveryData and sent to coordinator.
  3. Every server node on receiving TcpDiscoveryNodeAddedMessage:
    1. finishes local ConsistentCut.
    2. checks received ConsistentCutVersion, and if it's greater than local it updates local version to the received version.

For client node on start-up just sets its version to 0. It automatically updates with cluster version by receiving next GridNearTxPrepareResponse message.

...

  1. :
    1. Note, that snapshot consist of committed LocalStates and messages between nodes.
    2. committed sent and received collections are cleaned.

Map the algorithm to Ignite

Ignite components

On receiving a message with new CutVersion node sets it and commits LocalState and ChannelState - to identify wrong order of the events

  1. LocalState maps to local WAL (all of committed transactions are part of LocalState);
  2. Channel:
    1. We can piggy back on Ignite transaction protocol (Prepare, Finish) messages with CommunicationSpi.
    2. In case there is no transaction for a channel, we can rely on the DiscoverySpi to start local snapshot on non-participated nodes.
  3. ChannelState maps to `IgniteTxManager#activeTransactions`:
    1. sent collection match committed transactions for which local node is near - they send FinishMessages to other nodes.
    2. received collection match committed transactions for which local node isn't near - they receive FinishMessages from other nodes. 
  4. `IgniteTxManager#activeTransactions` doesn't track:
    1. committing transactions (COMMITTING+), they are removed from this collection before start committing them.
      1. track them additionally: add to a separate collection before it starts committing, and remove after it committed.

Naive algorithm

  1. Initial state:
    1. Ignite nodes started from snapshot, or other consistent state (after graceful cluster stop / deactivation).
    2. var color = WHITE .
    3. two empty collections for sent and received - Map<UUID, Collection<GridCacheVersion>>, where key is a remote node ID, value is nearXidVersion of committed transaction.
  2. After some time, Ignite nodes might have non-empty send and received collections.
  3. Ignite coordinator starts a local snapshot (it all must be done atomically):
    1. mark local var with color = RED 
    2. write a snapshot record to WAL (commits LocalState).
    3. replace sent and received for new empty instances, old collections are immutable now for the snapshot process.
    4. prepare a marker message to every node with subset of send transactions (send just Collection<GridCacheVersion> for every node).
  4. Send marker messages directly to all nodes in a cluster. It must be sent with every next FinishMessage (to avoid cases, when next FinishMessage received before marker message).
  5. Other nodes on receiving a marker with new color:
    1. starts a local snapshot (see steps from point 3).
  6. For every received marker message:
    1. For prepared received collection and received marker's sent calculate: (sent - received). It transactions that are in transit state. They are also part of snapshot.
  7. After markers from all nodes received:
    1. Will store ChannelState into WAL: write Map<UUID, Collection<GridCacheVersion>>, where value is (sent - received).
    2. Snapshot is finished locally.
  8. For all nodes color = RED . Next snapshot iteration will started with changing color to WHITE . 

Disadvantages:

  1. Collections sent and received could be too big to track and send with single message.
  2. It requires a lock for steps 3 and 4, it affects of sending FinishMessage that affects UX (lower performance of transactions).
  3. It worst case it requires O(N^2) additional messages, when N is cluster size. It's possible when no active transactions are in cluster (idle nodes).

Reducing the sent and received collections:

We can rely on two-phase protocol of Ignite transactions to reduce sent and received:

  1. Transactions have additional state (PREPARED) before sending and receiving a FinishMessage. Then we can say that PREPARED is a promise for sending/receiving FinishMessage.
  2. Then IgniteTxManager#activeTransactions  already can replace sent and received collections:
    1. it tracks all PREPARED transactions (it is analogue of sent - received).
    2. Also, it automatically shrinks after transaction committed.

Avoid locking in steps 3 and 4

  1. Removing writing WAL record from lock => inconsistency between fixed sent / received and committed LocalState. 
  2. Removing fixing sent and received from lock => inconsistency between fixed sent / received and committed LocalState.

To avoid misses:

  • sent and received might be wider (collect transaction that not only PREPARED, but ACTIVE+).
  • auto-shrink must be disabled during a snapshot process.

To avoid unwanted transactions:

  • Introduce a new collection excluded that is a collection of messages that changed LocalState concurrently with local snapshot, but they aren't part of the snapshot ChannelState (but part of next ChannelState, after snapshot).
  • Then Ignite need a rule to distinct different ChannelStates (before and after snapshot). There are possible solutions:
    1. Proposed: add additional flag to FinishMessage that shows which side of snapshot this transaction belongs to on near node.
    2. compare by fixed GridCacheVersion (fixed on remote node during the snapshot process):
      1. but order of transactions is not guaranteed, then snapshot may include a transaction with version that is greater than a version of transaction after the snapshot.
      2. it's possible to overcome with additional step (locally collecting transactions for additional exclude after all transactions committed).

Also there is no need to send send collection with FinishMessage. Then whole lock is reduced to single volatile variable:

  1. Atomically (with Compare-And-Swap) update the color variable and disable auto-shrink for active transactions.
  2. Write a LocalState to WAL
  3. Collect all active transactions
  4. FinishMessage is sent with 2 fields: color (snp state) and commit color for the specified transaction.
  5. Introduce the exclude collection that contains of transactions to exclude from Snapshot before LocalState commit while restoring.

Reducing additional messages

As now useful info is just a color field (no need to send sent collection) then:

  1. for idle nodes it's cheaper to send a single message over discovery, complexity O(N), where N - cluster size.
  2. no need to await all marker messages from other nodes. Every node can just dump active transactions.

Final algorithm

For Ignite implementation it's proposed to use only single node to coordinate algorithm. User starts a command for creating new ConsistentCut on single node and this node becomes responsible for coordinating Consistent Cut iteration:

  1. Initial state:
    1. Ignite nodes started from snapshot, or other consistent state (after graceful cluster stop / deactivation).
    2. var color = WHITE .
    3. Empty collection committingTxs (Set<GridCacheVersion>) that goal is to track COMMITTING+ transactions, that aren't part of IgniteTxManager#activeTx . It's automatically shrinks after transaction committed.
  2. After some time, Ignite nodes might have non-empty committingTxs.
  3. Ignite node inites a global snapshot, by starting DistributedProcess (by discovery IO.)
    1. Atomically: set color = RED and disable auto-shrink of committingTxs.
    2. write a snapshot record to WAL (commits LocalState).
    3. Collect of active transactions - concat of IgniteTxManager#activeTx and committingTxs 
    4. While receiving Finish messages from other nodes, node fills ChannelState: exclude and (sent - received) collections. 
    5. After all transactions finished, it writes a WAL record with ChannelState.
  4. New color is sent with transaction Finish messages.
  5. Committing node add an additional color field for FinishMessage, that shows whether to include transaction to snapshot, or not. 
  6. Other nodes on receiving a marker with new color starts a local snapshot (see steps from point 3).
  7. Notifies a node-initiator about finishing local procedure (with DistributedProcess protocol).
  8. For all nodes color = RED . Next snapshot iteration will started with changing color to WHITE . 

Consistent and inconsistent Cuts

Consistent Cut, in terms of Ignite implementation, is such cut that correctly finished on all baseline nodes - ConsistentCutStartRecord and ConsistentCutFinishRecord are written.

"Inconsistent" Cut is such a cut when one or more baseline nodes hasn't wrote ConsistentCutFinishRecord. It's possible in cases:

  1. any errors appeared during processing local Cut.
  2. if a transaction is recovered with transaction recovery protocol (txCutVer is unknown).
  3. if transaction finished in UNKNOWN state.
  4. baseline topology change, Ignite nodes finishes local Cuts running in this moment, making them inconsistent.

ConsistentCutVersion

Every ignite nodes tracks current ConsistentCutVersion:

Code Block
languagejava
titleConsistentCutVersion
class ConsistentCutVersion {
	long version;
}

`version` is a simple counter. It's guaranteed it is raising monotonically, due to it is incremented by discovery communication.

ConsistentCutColor initialization

ConsistentCutColor can be initialized with:

For changed server topology: 

  1. The color is received with GridDescoveryData from coordinator (for both client and server nodes).
  2. Every server node on receiving TcpDiscoveryNodeAddedMessage:
    1. finishes local ConsistentCut.

Order of transaction messages

...

Also some messages require to be signed with tx CutVersioncolor to check it them on primary/backup node:

  1. GridNearTxFinishRequest / GridDhtTxFinishRequest
  2. GridNearTxPrepareResponse / GridDhtTxPrepareResponse (for 1PC algorithm).

Ignite components

On receiving a message with new CutVersion node sets it and commits LocalState and ChannelState - to identify wrong order of the events

  1. LocalState maps to local WAL (all of committed transactions are part of LocalState);
  2. ChannelState maps to `IgniteTxManager#activeTransactions`. 
  3. `IgniteTxManager#activeTransactions` doesn't track committing transactions - track them additionally in ConsistentCutManager .
  1. GridNearTxPrepareResponse / GridDhtTxPrepareResponse (for 1PC algorithm).

Unstable topology

There are some cases to handle for unstable topology:

  1. Client or non-baseline server node leaves – no need to handle.
  2. Server node leaves:
    1. all nodes finish local running ConsistentCut, making them in-consistent
  3. Server node joins:
    1. all nodes finish local running ConsistentCut, making them in-consistentall nodes check ConsistentCutVersion received from new node, and update their local version if neededlocal running ConsistentCut, making them in-consistent
    2. new node checks whether rebalance was required for recovering. If it is required, then handle it TBD

...

It guarantees that the BEFORE side consist of:
1. transactions physically committed before ConsistentCutStartRecord and weren't included into ConsistentCutFinishRecord#after();
2. transactions physically committed between ConsistentCutStartRecord and ConsistentCutFinishRecord and were included into ConsistentCutFinishRecord#before().

It guarantees that the AFTER side consist of:
1. transactions physically committed before ConsistentCutStartRecord and were included into ConsistentCutFinishRecord#after();
2. transactions physically committed after ConsistentCutStartRecord and weren't included into ConsistentCutFinishRecord#before().It proposes to set ConsistentCutStartRecord#ts  independently on every node, because this time is only approximation - it depends on time of delivering ConsistentCutVersion, time of finishing active transactions, actual time on every node and client app. Then actually there is no need to sync it on multiple nodes. #before().


Code Block
languagejava
titleConsistentCutRecord
/** */
public class ConsistentCutStartRecord extends WALRecord {
    	/** ConsistentCutVersion,Timestamp counterof onstarting Igniteglobal coordinator. */
    private final ConsistentCutVersion cutVer;

	/** Approximated timestamp of cut stateConsistent Cut. */
	private final long ts;
}


/** */
public class ConsistentCutFinishRecord extends WALRecord {
    /**
     * Collections of TXs committed BEFORE the ConsistentCut (sent - received).
     */
    private final Set<GridCacheVersion> before;

     /**
     * Collections of TXs committed AFTER the ConsistentCut (exclude).
     */
    private final Set<GridCacheVersion> after;
 }

...