Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Definitions


Paper [1] defines ConsistentCut is a distributed snapshot algorithm. It uses some definitions, let's describe them in terms of Ignite:

  1. LocalState - set of committed transactions prior to a moment.
    1. In Ignite LocalState is committed by writing ConsistentCutStartRecord into WAL. 
  2. Message - transaction message, it is sent after changing LocalState on sending node, and it changes LocalState on receiving node after delivering
    1. In Ignite such messages are FinishRequest for 2PC, PrepareResponse for 1PC.
  3. Channel - unidirectional connection between two Ignite nodes, where messages is sent (Communication SPI).
  4. ChannelState - for single channel it's a set of messages that was sent (changed LocalState of sending node), but not received yet (hasn't changed LocalState of receiving node).
    1. In Ignite we can think that incoming ChannelState is represented by active transactions in PREPARING+ state (the Message might be sent by other nodes, but isn't received yet).
  5. GlobalSnapshot - set consist of (LocalState and ChannelState for every incoming channel) for every node.
    1. In Ignite the snapshot is represented with 2 WAL records (ConsistentCutStartRecord commits the LocalState, ConsistentCutFinishRecord describes the ChannelState).
  6. Marker - mark that piggy backs on the Message, and notifies a node about running snapshot.
    1. In Ignite ConsistentCutMarkerMessage is used.

In terms of Ignite, there are additional definitions:

  1. Consistent Cut - Successful attempt of creating Global Snapshot.
  2. Inconsistent Cut - Failed attempt of creating a Global Snapshot, due to inability to correctly describe a ChannelState.

Note, Consistent Cut can't guarantee that specific transaction that runs concurrently with the algorithm will land before or after cut, it only guarantees that set of the transactions before(or after) the cut will be the same on the each node in cluster.

Algorithm

...

algorithm that splits WAL on 2 global areas - Before and After. It guarantees that every transaction committed Before also will be committed Before on every other node participated in the transaction. It means that an Ignite nodes can safely recover themself to the consistent Before state without any coordination with each other.

The border between Before and After areas consists of two WAL records - ConsistentCutStartRecord and ConsistentCutFinishRecordIt guarantees that the Before consists of:

  1. Transactions committed before ConsistentCutStartRecord AND weren't included into ConsistentCutFinishRecord#after().
  2. Transactions committed between ConsistentCutStartRecord and ConsistentCutFinishRecord AND were included into ConsistentCutFinishRecord#before().

On the picture below the Before area consist of transactions colored to yellow, while After is green.

Image Added

Code Block
languagejava
titleConsistentCutRecord
/** */
public class ConsistentCutStartRecord extends WALRecord {
	/** Consistent Cut ID. */
	private final UUID cutId;
}


/** */
public class ConsistentCutFinishRecord extends WALRecord {
	/** Consistent Cut ID. */
	private final UUID cutId;

    /**
     * Collections of transactions committed BEFORE.
     */
    private final Set<GridCacheVersion> before;

     /**
     * Collections of transactions committed AFTER.
     */
    private final Set<GridCacheVersion> after;
 }

Algorithm

Picture bellow illustrates steps of the algorithm on single node:

Image AddedImage Added

  1. Initial state:
    1. No concurrent ConsistentCut process is running.
    2. lastFinishedCutId holds previous ConsistentCutId, or null.
  2. User starts a command for creating new incremental snapshot

...

  1. :
    1. Initial state:
      1. Ignite WAL are in consistent state relatively to previous full or incremental snapshot.
      2. Every Ignite node has local ConsistentCut future equals to null (node is WHITE).
      3. Empty collection committingTxs (Set<GridCacheVersion>) that goal is to track COMMITTING+ transactions, that aren't part of IgniteTxManager#activeTx.
    2. Ignite node inites a GlobalSnapshot, by starting DistributedProcess (by discovery IO):
      1. creates a new ConsistentCutMarker.
      2. prepares a marker message that contains the marker and transmits this message to other nodes.
      1. Ignite node inits a DistributedProcess with discovery message SnapshotOperationRequest that holds new ConsistentCutId (goal is to notify every node in a cluster about running incremental snapshot). 
      2. DistributedProcess store the topology version topVer on which ConsistentCut started.
    3. Process of creation of consistent cut can be started by two events (what will happen earlier):
      1. Receive the SnapshotOperationRequest#ConsistentCutId by DiscoverySPI (by the DistributedProcess).
      2. Receive the ConsistentCutAwareMessage#ConsistentCutId by CommunicationSPI (by transaction messages - Prepare, Finish).
    4. On receiving the ConsistentCutId it starts local ConsistentCut: 
      1. There are 2 roles that node might play:
        1. ROLE#1 - wraps outgoing messages - for all Ignite nodes: client, baseline, non-baseline server nodes.
        2. ROLE#2 - prepares data to be written in WAL - for baseline nodes only.
      2. Before start check:
        1. Whether ConsistentCut has already started (ConsistentCut != null) or finished (lastFinishedCutId == id) for this id, skip if it has.
        2. On non-baseline nodes In case ConsistentCut is inited by CommunicationSPI then compare the ConsistentCutAwareMessage#topVer with local node order:
          1. Local node order equals to new topVer on the moment when node joined to a cluster.
          2. If the order is higher than ConsistentCut topVer it means the node joined after ConsistentCut started. Skip start ConsistentCut on this node.
      3. ROLE#1:
        1. creates new ConsistentCut future.
          1. If node is  non-baseline (client, non-baseline servers) - complete it right after creation, and notify a node-initiator about local procedure has finished (by DistributedProcess protocol).
        2. While ConsistentCut != null wraps outgoing messages to ConsistentCutAwareMessage. It contains info:
          1. ConsistentCutId (to start ConsistentCut 
      Every nodes starts a local snapshot process after receiving the marker message (whether by discovery, or by communication with transaction message) 
    5. Atomically: creates new ConsistentCut future (node becomes RED), creates committingTxs, starts signing outgoing messages with the ConsistentCutMarker.
    6. Write a snapshot record to WAL with the received ConsistentCutMarker (commits LocalState).
    7. Collect of active transactions - concat of IgniteTxManager#activeTx and committingTxs 
    8. Prepares 2 empty collections - before andafter cut (describes ChannelState).While global Consistent Cut is running every node signs output transaction messages:
      1. Prepare messages signed with the ConsistentCutMarker (to trigger ConsistentCut
          1. on remote node, if not yet).
      2. Finish messages signed with the ConsistentCutMarker (to trigger...) and transaction ConsistentCutMarker (to notify nodes which side of cut this transaction belongs to).
      3. Finish messages is signed on node that commits first (near node for 2PC, backup or primary for 1PC).
    9. For every collected active transaction, node waits for Finish message, to extract the ConsistentCutMarker and fills before, after collections:
      1. if received marker is null or differs from local, then transaction on before side
      2. if received color equals to local, then transaction on after side
          1. Messages contain additional field txCutId. It is originally set on the nodes that commit first:
            1. For 2PC it is an originated node.
            2. For 1PC it is a backup node.
          2. If txCutId equals to null then transaction starts committing Before Consistent Cut started, otherwise After.
        1. On receive ConsistentCutAwareMessage that makes transaction committed (FinishRequest for 2PC, PrepareResponse for 1PC) sets tx#txCutId = message#txCutId.
      1. ROLE#2 - for baseline nodes only:
        1. In the message thread atomically inits ConsistentCut:
          1. creates new ConsistentCut future.
          2. creates empty collection removedActiveTxs (This collection doesn't remove transactions unlike IgniteTxManager#activeTx does).
        2. In the background thread:
          1. Writes a ConsistentCutStartRecord  to WAL with the received ConsistentCutId.
          2. Creates a copy (weakly-consistent) of IgniteTxManager#activeTx. Set listeners on those tx#finishFuture.
            1. For optimization it's safely exclude transactions that tx#status == ACTIVE. It's guaranteed that such transactions belongs After side.
          3. Creates a copy of removedActiveTxs (contains transactions that are might be cleaned from IgniteTxManager#activeTx). Set listeners on those tx#finishFuture.
          4. Set removedActiveTxs to null. We don't care of txs concurrently added to removedActiveTxs, they just don't land into "before" or "after" set and will be excluded from recovery.
        3. In transaction threads fills removedActiveTxs if ConsistentCut != null and removedActiveTxs != null:
          1. Every transaction is added into removedActiveTxsright before it is removed from IgniteTxManager#activeTx.
        4. For every listening transaction, the callback is called when transaction finished:
          1. check If transaction state is UNKNOWN or status is RECOVERY_FINISH, then complete ConsistentCut with exception.
          2. If transaction mapped to a higher topology version than ConsistentCut topVer, then put it into after.
          3. if tx#txCutId equals to local, then put transaction into after, otherwise put into before.
        5. After every listening transaction finished:
          1. Writes a ConsistentCutFinishRecord  into WAL with the collections
      After all transactions finished:
      1. Writes a finish WAL record with ChannelState
          1. ( before, after ). 
      2. Stops filling committingTxs.
          1. Completes ConsistentCut
        future, and notifies
          1.   future.
        1. Notify a node-initiator about
        finishing
        1. local procedure has finished (
        with
        1. by DistributedProcess protocol).
    10. After all nodes finished ConsistentCut, on every node stops :
      1. Updates lastFinishedCutId with the current id.
      2. ConsistentCut  future becomes null.
      3. Stops signing outgoing transaction messages
      - ConsistentCut future becomes null (node is WHITE again)
      1. .
    11. Node initiator checks that every node completes correctly.
      1. If any node complete exceptionally - complete Incremental Snapshot with exception.

    Consistent and inconsistent Cuts

    Consistent Cut , in terms of Ignite implementation, is such cut that correctly finished on all baseline nodes - ConsistentCutStartRecord  and ConsistentCutFinishRecord  are written.

    "Inconsistent" Cut is such a cut when one or more baseline nodes hasn't wrote ConsistentCutFinishRecord . It's possible in cases:

    1. any errors appeared during processing local Cut.
    2. if a transaction is recovered with transaction recovery protocol (tx.finalizationStatus == RECOVERY_FINISH).
    3. if transaction finished in UNKNOWN state.
    4. baseline topology change, Ignite nodes finishes local Cuts running in this moment, making them inconsistent.

    ConsistentCutMarker

    Every ignite nodes tracks current ConsistentCutMarker:

    Code Block
    languagejava
    titleConsistentCutVersion
    class ConsistentCutMarker {
    	UUID id;
    }

    `id` is just a unique ConsistentCut ID (is assigned on the node initiator).

    1. .

    Wrapping

    ...

    messages

    Ignite transaction protocol includes multiple messages. But only some of them affects meaningful (relating to the algorithm) that change state of transactions (PREPARED, COMMITTED):

    1. GridNearTxPrepareRequest / GridDhtTxPrepareRequest
    2. GridNearTxPrepareResponse / GridDhtTxPrepareResponse
    3. GridNearTxFinishRequest / GridDhtTxFinishRequest

    Those messages are wrapped in ConsistentCutAwareMessage  that is prepared right before sending message on other node. They used the current ConsistentCutId. Also some messages require to be signed with tx colorcombine with additional ConsistentCutId to check it them on primary/backup node:

    1. GridNearTxFinishRequest / GridDhtTxFinishRequest
    2. GridNearTxPrepareResponse / GridDhtTxPrepareResponse (for 1PC algorithm).

    WAL records

    There are 2 records: `ConsistentCutStartRecord` for Start event and `ConsistentCutFinishRecord` for Finish event. 

    • ConsistentCutStartRecord: record is written to WAL in moment when CC starts on a local node. It helps to limit amout of active transactions to check. But there is no strict guarantee for all transactions belonged to the BEFORE side to be physically committed before ConsistentCutStartRecord, and vice versa. This is the reason for having ConsistentCutFinishRecord.
    • ConsistentCutFinishRecord: This record is written to WAL after Consistent Cut stopped analyzing transactions and storing them in a particular bucket (BEFORE or AFTER).

    ...

    Those messages are filled with txCutId  that is prepared right before transaction starts committing on first committing node. They used the current ConsistentCutId for this setting. If current ConsistentCutId is not null, then transaction starts committing after ConsistentCut started and it means that this transaction belongs the After side. 


    Code Block
    languagejava
    titleConsistentCutRecordConsistentCutAwareMessage
    class ConsistentCutAwareMessage {
    	/** Original transaction message. */
    public class ConsistentCutStartRecord extends WALRecord {
    	Message msg;
    
    	/** Consistent Cut ID. */
    	UUID cutId;
    
     	/** Consistent Cut MarkerID thatafter initswhich Consistenttransaction Cutcommitted. */
    	private final ConsistentCutMarker marker;
    }
    
    
        @Nullable UUID txCutId;
    
    	/** Cluster topology version on which Consistent Cut started. */
    	long topVer;
    }


    Transaction

    A new field added to IgniteInternalTx

    Code Block
    languagejava
    titleIgniteInternalTx
    class IgniteInternalTx {     public class ConsistentCutFinishRecord extends WALRecord {
        /**
             * @param CollectionsID of TXs{@link committedConsistentCut} BEFOREAFTER thewhich ConsistentCutthis (senttransaction - received).
         */
        private final Set<GridCacheVersion> before;
    
    was committed, {@code null} if transaction
         /**
         * Collections of TXs committed AFTER the ConsistentCut (exclude).
        *           committed BEFORE.
         */
        private final Set<GridCacheVersion> after;
     }
    

    Unstable topology

    There are some cases to handle for unstable topology:

    1. Client or non-baseline server node leaves – no need to handle.
    2. Server node leaves:
      1. all nodes finish local running ConsistentCut, making them in-consistent
    3. Server node joins:
      1. all nodes finish local running ConsistentCut, making them in-consistent
      2. new node checks whether rebalance was required for recovering. If it is required, then handle it TBD

    TBD: Which ways to use to avoid inconsistency between data and WAL after rebalance. There are options:

    ...

        public void cutId(@Nullable UUID id);
    }

    Consistent Cut Classes

    Code Block
    languagejava
    titleConsistentCutManager
    // Class is responsible for managing all stuff related to Consistent Cut. It's an entrypoint for transaction threads to check running consistent cut.
    class ConsistentCutManager extends GridCacheSharedManagerAdapter {               
        // Current Consistent Cut. All transactions threads wraps outgoing messages if this field is not null.  */
        volatile @Nullable ConsistentCut cut;
    	
    	// Entrypoint for handling received new Consistent Cut ID.
    	void handleConsistentCutId(UUID id);
    }


    Code Block
    languagejava
    titleConsistentCut
    class ConsistentCut extends GridFutureAdapter<WALPointer> {          
    	Set<GridCacheVersion> beforeCut;
    
        Set<GridCacheVersion> afterCut;
    
        Set<IgniteInternalFuture<IgniteInternalTx>> removedActive;
    }
    1. ON DISTRIBUTED SNAPSHOTS, Ten H. LAI and Tao H. YANG, 29 May 1987