Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


ConsistentCut is a distributed algorithm that splits WAL on 2 global areas - Before and After. It guarantees that every transaction committed Before also will be committed Before on every other node participated in the transaction. It means that an Ignite nodes can safely recover themself to the consistent Before state without any coordination with each other.

...

Picture bellow illustrates steps of the algorithm on single node:

Image ModifiedImage Added

  1. Initial state:
    1. No concurrent ConsistentCut process is running.
    2. lastFinishedCutId lastFinishedCutId holds previous ConsistentCutId, or null.
  2. User starts a command for creating new incremental snapshot:
    1. Ignite node inits a DistributedProcess  with with discovery message SnapshotOperationRequest that holds that holds new ConsistentCutId  (goal is to notify every node in a cluster about running incremental snapshot). 
    2. DistributedProcess fix store the topology version topVer on which ConsistentCut started.
  3. Process of creation of incremental snapshot consistent cut can be started by two events (what will happen earlier):
    1. Receive the SnapshotOperationRequest#ConsistentCutId by DiscoverySPI (by the DistributedProcess).
    2. Receive the ConsistentCutAwareMessage#ConsistentCutId by ConsistentCutAwareMessage#ConsistentCutId by CommunicationSPI (by transaction messages - Prepare, Finish).
  4. On receiving the ConsistentCutId , every node: it starts local ConsistentCut: 
    1. There are 2 roles that node might play:
      1. ROLE#1 - wraps outgoing messages - for all Ignite nodes: client, baseline, non-baseline server nodes.
      2. ROLE#2 - prepares data to be written in WAL - for baseline nodes only.
    2. Before start check:
      1. Whether
    3. Checks whether
      1. ConsistentCut has already started (ConsistentCut
      is running
      1. != null) or finished (lastFinishedCutId
       
      1. == id) for this id, skip if it has.
      2. On non-baseline nodes In case ConsistentCut is inited by CommunicationSPI then compare the
      ConsistentCutAwareMessage#topVer with
      1. ConsistentCutAwareMessage#topVer with local node order:
          Node
          1. Local node order equals to
          current
          1. new topVer on the moment when node
          has
          1. joined to a cluster.
          2. If the order is higher than ConsistentCut topVer
          then
          1. it means the node joined after ConsistentCut started. Skip start ConsistentCut on this node.
      2. ROLE#1:
        1. creates new ConsistentCut future.
          1. If node is  non-baseline (client, non-baseline servers) - complete it right after creation, and notify a node-initiator about local procedure has finished (by DistributedProcess protocol).
        2. While ConsistentCut != null wraps outgoing messages to ConsistentCutAwareMessage. It contains info:
          1. ConsistentCutId (to start ConsistentCut  on remote node, if not yet).
          2. Messages contain additional field txCutId. It is originally set on the nodes that commit first:
            1. For 2PC it is an originated node.
            2. For 1PC it is a backup node.
          3. If txCutId equals to null then transaction starts committing Before Consistent Cut started, otherwise After.
        3. On receive ConsistentCutAwareMessage that makes transaction committed (FinishRequest for 2PC, PrepareResponse for 1PC) sets tx#txCutId = message#txCutId.
      3. ROLE#2 - for baseline nodes only:
      Filling committingTxs:
      1. When ConsistentCut is running every transaction is added into committingTxs 
        1. In the message thread atomically inits ConsistentCut:
          1. creates new ConsistentCut
           future
          1. future.
          2. creates
          committingTx
          1. empty collection removedActiveTxs (
          this collection fill with transactions in COMMITTING state, and it
          1. This collection doesn't remove transactions
          unlike 
          1. unlike IgniteTxManager#activeTx does).
        2. starts wraps outgoing messages to ConsistentCutAwareMessage (contains ConsistentCutId).
        3. In the background thread:
          1. Writes a ConsistentCutStartRecord  to WAL
          with the
          1. with the received ConsistentCutId.
          2. Creates a copy
          of 
          1. (weakly-consistent) of IgniteTxManager#activeTx.
           Set
          1. Set listeners on
          those 
          1. those tx#finishFuture.
            1. For optimization it's safely exclude transactions that tx#status == ACTIVE. It's guaranteed that such transactions belongs After side.
          2. Creates a copy
          of committingTxs
          1. of removedActiveTxs (contains transactions that are
          already
          1. might be cleaned
          from 
          1. from IgniteTxManager#activeTx).
           Set
          1. Set listeners on
          those 
          1. those tx#finishFuture.
          Set committingTxs
          1. Set removedActiveTxs to null.
    4. While the DistributedProcess  is running every node wraps outgoing transaction messages (Prepare, Finish) to ConsistentCutAwareMessage (transaction has not committed yet on sender node) or ConsistentCutAwareTxFinishMessage (transaction has committed on a sender node). Messages contain info:
      1. ConsistentCutId (to trigger ConsistentCut  on remote node, if not yet).
      2. ConsistentCutAwareTxFinishMessage messages contains additionally txCutIdIt set on the node that commits first (if it's not null then transaction starts committing After Consistent Cut):
        1. For 2PC it is an originated node.
        2. For 1PC it is a backup node.
          1. We don't care of txs concurrently added to removedActiveTxs, they just don't land into "before" or "after" set and will be excluded from recovery.
        1. In transaction threads fills removedActiveTxs if ConsistentCut != null and removedActiveTxs != null:
          1. Every transaction is added into removedActiveTxs
          1. right before it is removed from IgniteTxManager#activeTx.
        1. For every
      receiving ConsistentCutAwareTxFinishMessage Ignite marks the related transaction with message#txCutId.For every
        1. listening transaction,
      the callback
        1. the callback is called when transaction finished:
          1. check If transaction state is UNKNOWN or status is RECOVERY_FINISH, then complete ConsistentCut with exception.
          2. If transaction mapped to a higher topology version than
        ConsistentCut 
          1. ConsistentCut topVer, then put it into after
         (topology changed after ConsistentCut started)
          1. .
          2. if
        tx#txCutId equals
          1. tx#txCutId equals to local, then put transaction
        into 
          1. into after, otherwise put into before.
        1. After every listening transaction finished:
          1. Writes a ConsistentCutFinishRecord  into WAL with
        the collections 
          1. the collections ( before, after ). 
          2. Completes ConsistentCut  future.
      1. Note, that it continues to wrap messages even after local ConsistentCut finish.
      After ConsistentCut finish, DistributeProcess automatically notifies
        1. Notify a node-initiator about local procedure has finished (by DistributedProcess protocol).
    5. After all nodes finished ConsistentCut,  on on every node:
      1. Updates Updates lastFinishedCutId with the current id.
      2. ConsistentCut  future becomes future becomes null.
      3. Stops signing outgoing transaction messages.
    6. Node initiator checks that every node completes correctly.
      1. If any node complete exceptionally - complete Incremental Snapshot with exception.

    Consistent and inconsistent Cuts

    ...

    1. any errors appeared during processing local Cut.
    2. if a transaction is recovered with transaction recovery protocol (tx.finalizationStatus == RECOVERY_FINISH).
    3. if transaction finished in UNKNOWN state.
    4. topology change.

    Wrapping messages

    ...

    Image Removed

    Ignite transaction protocol includes multiple messages. But only some of them affects meaningful (relating to the algorithm) that change state of transactions (PREPARED, COMMITTED):

    ...

    Those messages are wrapped in ConsistentCutAwareMessage  that is prepared right before sending message on other node. They used the current ConsistentCutId. Also some messages require to be combine with additional ConsistentCutId to check it them on primary/backup node:

    1. GridNearTxFinishRequest / GridDhtTxFinishRequest
    2. GridNearTxPrepareResponse / GridDhtTxPrepareResponse (for 1PC algorithm).

    Those messages are filled with txCutId  that is prepared right before transaction starts committing on first committing node. They used the current ConsistentCutId for this setting. If current ConsistentCutId is not null, then transaction starts committing after ConsistentCut started and it means that this transaction belongs the After side. 


    Code Block
    languagejava
    titleConsistentCutAwareMessage
    class ConsistentCutAwareMessage {
    	/** Original transaction message. */
    	Message msg;
    
    	/** Consistent Cut ID. */
    	UUID cutId;
    
     	/** Consistent Cut ID after which transaction committed. */
        @Nullable UUID txCutId;
    
    	/** Cluster topology version on which Consistent Cut started. */
    	long topVer;
    }

    Also some messages require to be combine with additional ConsistentCutId to check it them on primary/backup node.

    1. GridNearTxFinishRequest / GridDhtTxFinishRequest
    2. GridNearTxPrepareResponse / GridDhtTxPrepareResponse (for 1PC algorithm).

    ...


    Transaction

    A new field added to IgniteInternalTx

    Code Block
    languagejava
    titleIgniteInternalTx
    class IgniteInternalTx {     
        /**
         * @param ID of {@link ConsistentCut} AFTER which this transaction was committed, {@code null} if transaction
         *           committed BEFORE.
         */
        public void cutId(@Nullable UUID id);
    }

    Consistent Cut Classes

    Code Block
    languagejava
    titleConsistentCutManager
    // Class is responsible for managing all stuff related to Consistent Cut. It's an entrypoint for transaction threads to check running consistent cut.
    class ConsistentCutManager extends GridCacheSharedManagerAdapter {               
        // Current Consistent Cut. All transactions threads wraps outgoing messages if this field is not null.  */
        volatile @Nullable ConsistentCut cut;
    	
    	// Entrypoint for handling received new Consistent Cut ID.
    	void handleConsistentCutId(UUID id);
    }


    Code Block
    languagejava
    titleConsistentCutAwareTxFinishMessageConsistentCut
    class ConsistentCutAwareTransactionFinishMessageConsistentCut extends ConsistentCutAwareMessageGridFutureAdapter<WALPointer> {
    	/** Consistent  Cut  ID  after which transaction committed. */
     
    	Set<GridCacheVersion> beforeCut;
    
        Set<GridCacheVersion> afterCut;
    
       @Nullable UUIDSet<IgniteInternalFuture<IgniteInternalTx>> txCutIdremovedActive;
    }
    1. ON DISTRIBUTED SNAPSHOTS, Ten H. LAI and Tao H. YANG, 29 May 1987