Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In case of consistent cut every node knows which side of cut a local transaction belongs to (BEFORE or AFTER). On the second picture, node Q after event rcv knows whether tx5 was included to cut on node P. Then if it was, node Q includes it, otherwise doesn't include. 

Paper Algorithm

There is a paper [1] that proposes an algorithm for implementing distributed consistent cut. This algorithm works without a coordinator of the procedure, it is suitable for processes with no-FIFO channels between processes.

...

  1. LocalState maps to local WAL (all of committed transactions are part of LocalState);
  2. Channel:
    1. We can piggy back on Ignite transaction protocol (Prepare, Finish) messages with CommunicationSpi.
    2. In case there is no transaction for a channel, we can rely on the DiscoverySpi to start local snapshot on non-participated nodes.
  3. ChannelState maps to `IgniteTxManager#activeTransactions`:
    1. sent collection match committed transactions for which local node is near - they send FinishMessages to other nodes.
    2. received collection match committed transactions for which local node isn't near - they receive FinishMessages from other nodes. 
  4. `IgniteTxManager#activeTransactions` doesn't track:
    1. committing transactions (COMMITTING+), they are removed from this collection before start committing them.
      1. track them additionally: add to a separate collection before it starts committing, and remove after it committed.

...

Algorithm

For Ignite implementation it's proposed to use only single node to coordinate algorithm. User starts a command for creating new ConsistentCut on single node and this node becomes responsible for coordinating Consistent Cut iteration:

  1. Initial state:
    1. Ignite nodes started from snapshot, or other consistent state (after graceful cluster stop / deactivation).
    2. var color = WHITE .
    3. two empty collections for sent and received - Map<UUID, Collection<GridCacheVersion>>, where key is a remote node ID, value is nearXidVersion of committed transaction.
  2. After some time, Ignite nodes might have non-empty send and received collections.
  3. Ignite coordinator starts a local snapshot (it all must be done atomically):
    1. mark local var with color = RED 
    2. write a snapshot record to WAL (commits LocalState).
    3. replace sent and received for new empty instances, old collections are immutable now for the snapshot process.
    4. prepare a marker message to every node with subset of send transactions (send just Collection<GridCacheVersion> for every node).
  4. Send marker messages directly to all nodes in a cluster. It must be sent with every next FinishMessage (to avoid cases, when next FinishMessage received before marker message).
  5. Other nodes on receiving a marker with new color:
    1. starts a local snapshot (see steps from point 3).
  6. For every received marker message:
    1. For prepared received collection and received marker's sent calculate: (sent - received). It transactions that are in transit state. They are also part of snapshot.
  7. After markers from all nodes received:
    1. Will store ChannelState into WAL: write Map<UUID, Collection<GridCacheVersion>>, where value is (sent - received).
    2. Snapshot is finished locally.
  8. For all nodes color = RED . Next snapshot iteration will started with changing color to WHITE . 

Disadvantages:

  1. Collections sent and received could be too big to track and send with single message.
  2. It requires a lock for steps 3 and 4, it affects of sending FinishMessage that affects UX (lower performance of transactions).
  3. It worst case it requires O(N^2) additional messages, when N is cluster size. It's possible when no active transactions are in cluster (idle nodes).

Reducing the sent and received collections:

We can rely on two-phase protocol of Ignite transactions to reduce sent and received:

  1. Transactions have additional state (PREPARED) before sending and receiving a FinishMessage. Then we can say that PREPARED is a promise for sending/receiving FinishMessage.
  2. Then IgniteTxManager#activeTransactions  already can replace sent and received collections:
    1. it tracks all PREPARED transactions (it is analogue of sent - received).
    2. Also, it automatically shrinks after transaction committed.

Avoid locking in steps 3 and 4

  1. Removing writing WAL record from lock => inconsistency between fixed sent / received and committed LocalState. 
  2. Removing fixing sent and received from lock => inconsistency between fixed sent / received and committed LocalState.

To avoid misses:

  • sent and received might be wider - collect transaction that not only PREPARED, but PREPARING+. Do not track ACTIVE transactions (as algorithm guarantees they will be excluded from snapshot: ACTIVE < PREPARED than such txs aren't part neither of ChannelState or LocalState).
  • auto-shrink must be disabled during a snapshot process.

To avoid unwanted transactions:

  • Introduce a new collection excluded that is a collection of messages that changed LocalState concurrently with local snapshot, but they aren't part of the snapshot ChannelState (but part of next ChannelState, after snapshot).
  • Then Ignite need a rule to distinct different ChannelStates (before and after snapshot) - add additional flag to FinishMessage that shows which side of snapshot this transaction belongs to on near node.

Also there is no need to send send collection with FinishMessage. Then whole lock is reduced to single volatile variable:

  1. Atomically (with Compare-And-Swap) update the color variable and disable auto-shrink for active transactions.
  2. Write a LocalState to WAL
  3. Collect all active transactions
  4. FinishMessage is sent with 2 fields: color (snp state) and commit color for the specified transaction.
  5. Introduce the exclude collection that contains of transactions to exclude from Snapshot before LocalState commit while restoring.

Reducing additional messages

As now useful info is just a color field (no need to send sent collection) then:

  1. for idle nodes it's cheaper to send a single message over discovery, complexity O(N), where N - cluster size.
  2. no need to await all marker messages from other nodes. Every node can just dump active transactions.

Final algorithm

For Ignite implementation it's proposed to use only single node to coordinate algorithm. User starts a command for creating new ConsistentCut on single node and this node becomes responsible for coordinating Consistent Cut iteration:

    1. WAL are in consistent state relatively to previous full or incremental snapshot.
    2. Every Ignite node has local ConsistentCut future equals to null (node is WHITE).
    3. Empty collection committingTxs (Set<GridCacheVersion>) that goal is to track COMMITTING+ transactions, that aren't part of IgniteTxManager#activeTx . It's automatically shrinks after transaction committed.
  1. Ignite node inites a global snapshot, by starting DistributedProcess (by discovery IO):
    1. creates a new ConsistentCutMarker.
    2. prepares a marker message that contains the marker and transmits this message to other nodes.
  2. Every nodes starts a local snapshot process after receiving the marker message (whether by discovery, or by communication with transaction message) 
    1. Atomically: creates new ConsistentCut future (node becomes RED), creates committingTxs, starts signing outgoing messages with the ConsistentCutMarker.
    2. Write a snapshot record to WAL with the received ConsistentCutMarker (commits LocalState).
    3. Collect of active transactions -
  3. Initial state:
    1. Ignite nodes started from snapshot, or other consistent state (after graceful cluster stop / deactivation).
    2. Every Ignite node holds a color (RED / WHITE / null)initially null.
    3. Empty collection committingTxs (Set<GridCacheVersion>) that goal is to track COMMITTING+ transactions, that aren't part of IgniteTxManager#activeTx . It's automatically shrinks after transaction committed.
  4. After some time, Ignite nodes might have non-empty committingTxs.
  5. Ignite node inites a global snapshot, by starting DistributedProcess (by discovery IO):
    1. switches a color (null → WHITE → RED → WHITE → ...)
    2. creates a new timestamp, checks that it's greater than previous (incremental snapshot).
    3. prepares a marker message that contains the color  and the timestamp. And transmits this message to other nodes.
  6. Every nodes starts a local snapshot process after receiving the marker message (whether by discovery, or by communication with transaction message) 
    1. Atomically: updates local `color`, disable auto-shrink of committingTxs, prepare ConsistentCut future.
    2. Write a snapshot record to WAL with the received timestamp (commits LocalState).
    3. Collect of active transactions - concat of IgniteTxManager#activeTx and committingTxs 
    4. Prepares 2 empty collections - before [sent - received] and after [exclude] cut. and after [exclude] cut.
  7. While global Consistent Cut is running every node signs output transaction messages:
    1. Prepare messages signed with the ConsistentCutMarker (to trigger ConsistentCut on remote node, if not yet).
    2. Finish messages signed with the ConsistentCutMarker (to trigger...) and transaction ConsistentCutMarker (to notify nodes which side of cut this transaction belongs to)
    While global Consistent Cut is running every node signs output transaction messages:
    1. The marker message.
    2. Finish messages is signed on node that commits first (near node for 2PC, backup or primary for 1PC) with color  to notify other nodes which side of cut transaction belongs to.
  8. For every collected active transaction, node waits for Finish message, to extract the color and the ConsistentCutMarker and fills before, after collections:
    1. if received color marker is null or differs from local, then transaction on before side
    2. if received color equals to local, then transaction on after side
  9. After all transactions finished:
    1. Writes a WAL record with ChannelState (before, after). 
    2. Clears committingTxs and enables auto-shrink againStops filling committingTxs.
    3. Completes ConsistentCut future, and notifies a node-initiator about finishing local procedure (with DistributedProcess protocol).
  10. After all nodes finished ConsistentCut, every node stops signing outgoing transaction messages .Every node, now have updated color (non-null- ConsistentCut becomes null (node is WHITE again).

Consistent and inconsistent Cuts

...

  1. any errors appeared during processing local Cut.
  2. if a transaction is recovered with transaction recovery protocol (tx.finalizationStatus == RECOVERY_FINISH).
  3. if transaction finished in UNKNOWN state.
  4. baseline topology change, Ignite nodes finishes local Cuts running in this moment, making them inconsistent.

...

ConsistentCutMarker

Every ignite nodes tracks current ConsistentCutVersionConsistentCutMarker:

Code Block
languagejava
titleConsistentCutVersion
class ConsistentCutVersion {
	long version;
}

`version` is a simple counter. It's guaranteed it is raising monotonically, due to it is incremented by discovery communication.

ConsistentCutColor initialization

ConsistentCutColor can be initialized with:

For changed server topology: 

...

java
titleConsistentCutVersion
class ConsistentCutMarker {
	UUID id;
}

`id` is just a unique ConsistentCut ID (is assigned on the node initiator)

...

.

Order of transaction messages

...

  1. There are some issues with ACTIVE transactions:
    1. They can be long
    2. They may become SUSPENDED and hang ConsistentCut after that (The only valid way to reach SUSPENDED state is ACTIVE → SUSPENDED).
  2. Then it's better to avoid checking such transactions, and algorithm helps it: for every transaction in ACTIVE state there are at least 2 messages to sync CutVersion between nodes (PREPARE response, FINISH request) - it's enough to provide a guarantee that every ACTIVE transaction will on the AFTER side.We skip SUSPENDED transactions to avoid a Consistent Cut hangs. The only valid way to reach SUSPENDED state is ACTIVE → SUSPENDED. Then no need additional checks there.
  3. For other transactions (>= PREPARING) algorithm listens their finish futures and checks txCutVer

Consider some examples below (P - near, Q - primary):

...

Code Block
languagejava
titleConsistentCutRecord
/** */
public class ConsistentCutStartRecord extends WALRecord {
	/** TimestampMarker ofthat starting globalinits Consistent Cut. */
	private final longConsistentCutMarker tsmarker;
}


/** */
public class ConsistentCutFinishRecord extends WALRecord {
    /**
     * Collections of TXs committed BEFORE the ConsistentCut (sent - received).
     */
    private final Set<GridCacheVersion> before;

     /**
     * Collections of TXs committed AFTER the ConsistentCut (exclude).
     */
    private final Set<GridCacheVersion> after;
 }

...