Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


ConsistentCut is a distributed algorithm that splits WAL on 2 global areas - Before and After. It guarantees that every transaction committed Before also will be committed Before on every other node participated in the transaction. It means that an Ignite node nodes can safely recover itself themself to the consistent Before state without any coordination with each other nodes.

The border between Before and After areas consists of two WAL records - ConsistentCutStartRecord and ConsistentCutFinishRecordIt guarantees that the Before consists of:

  1. Transactions committed before ConsistentCutStartRecord and AND weren't included into ConsistentCutFinishRecord#after().
  2. Transactions committed between ConsistentCutStartRecord and ConsistentCutFinishRecord and AND were included into ConsistentCutFinishRecord#before().

...

Code Block
languagejava
titleConsistentCutRecord
/** */
public class ConsistentCutStartRecord extends WALRecord {
	/** Marker that inits Consistent Cut ID. */
	private final ConsistentCutMarkerUUID markercutId;
}


/** */
public class ConsistentCutFinishRecord extends WALRecord {
	/** Marker that inits Consistent Cut ID. */
	private final ConsistentCutMarkerUUID markercutId;

    /**
     * Collections of transactions committed BEFORE.
     */
    private final Set<GridCacheVersion> before;

     /**
     * Collections of transactions committed AFTER.
     */
    private final Set<GridCacheVersion> after;
 }

Algorithm

...

Picture bellow illustrates steps of the algorithm on single node:

Image AddedImage Added

  1. Initial state:
    1. No concurrent ConsistentCut process is running.
    2. lastFinishedCutId holds previous ConsistentCutId, or null.
  2. User starts a command for creating new incremental snapshot:
    1. Ignite node inits DistributedProcess with special message a DistributedProcess with discovery message SnapshotOperationRequest that holds new ConsistentCutMarker ConsistentCutId (goal is to notify every node in a cluster about running incremental snapshot). 
    2. DistributedProcess store the topology version topVer on which ConsistentCut started.
  3. Process of creation of incremental snapshot consistent cut can be started by two events (what will happen earlier):
    1. Receive the ConsistentCutMarker by discoverythe SnapshotOperationRequest#ConsistentCutId by DiscoverySPI (by the DistributedProcess).
    2. Receive the ConsistentCutMarker by transaction message ( ConsistentCutAwareMessage#ConsistentCutId by CommunicationSPI (by transaction messages - Prepare, Finish).
  4. On receiving the marker, every node:  ConsistentCutId it starts local ConsistentCut: 
    1. There are 2 roles that node might play:
      1. ROLE#1 - wraps outgoing messages - for all Ignite nodes: client, baseline, non-baseline server nodes.
      2. ROLE#2 - prepares data to be written in WAL - for baseline nodes only.
    2. Before start check:
      1. Whether ConsistentCut has already started (ConsistentCut != null) or finished (lastFinishedCutId == id) for this id
    3. Checks whether ConsistentCut has already started for this marker
      1. , skip if it has.
    4. Checks local topVersion  with received in marker. Skip if it is different.
    5. In message thread atomically:
      1. creates new ConsistentCut future
      2. creates committingTx, goal is to track COMMITTING transactions, that aren't part of IgniteTxManager#activeTx
      3. starts signing outgoing messages with the ConsistentCutMarker.
      1. On non-baseline nodes In case ConsistentCut is inited by CommunicationSPI then compare the ConsistentCutAwareMessage#topVer with local node order:
        1. Local node order equals to new topVer on the moment when node joined to a cluster.
        2. If the order is higher than ConsistentCut topVer it means the node joined after ConsistentCut started. Skip start ConsistentCut on this node.
    6. ROLE#1:
      1. creates new ConsistentCut future.
        1. If node is  non-baseline (client, non-baseline servers) - complete it right after creation, and notify a node-initiator about local procedure has finished (by DistributedProcess protocol).
      2. While ConsistentCut != null wraps outgoing messages to ConsistentCutAwareMessage. It contains info:
        1. ConsistentCutId (to start ConsistentCut  on remote node, if not yet).
        2. Messages contain additional field txCutId. It is originally set on the nodes that commit first:
          1. For 2PC it is an originated node.
          2. For 1PC it is a backup node.
        3. If txCutId equals to null then transaction starts committing Before Consistent Cut started, otherwise After.
      3. On receive ConsistentCutAwareMessage that makes transaction committed (FinishRequest for 2PC, PrepareResponse for 1PC) sets tx#txCutId = message#txCutId.
    7. ROLE#2 - for baseline nodes only:
      1. In the message thread atomically inits ConsistentCut:
        1. creates new ConsistentCut future.
        2. creates empty collection removedActiveTxs (This collection doesn't remove transactions unlike IgniteTxManager#activeTx does).
      2. In the
      In
      1. background thread:
        1. Writes a ConsistentCutStartRecord  to WAL
        with the
        1. with the received
        ConsistentCutMarker 
        1. ConsistentCutId.
      2. Collects active transactions - concat of IgniteTxManager#activeTx and committingTxs .
  5. While the DistributedProcess  is alive every node signs output transaction messages:
    1. Prepare messages signed with the ConsistentCutMarker  (to trigger ConsistentCut  on remote node, if not yet).
    2. Finish messages signed with the ConsistentCutMarker  (to trigger...) and transaction ConsistentCutMarker  (to notify nodes which side of cut this transaction belongs to).
    3. Finish messages is signed with transaction ConsistentCutMarker on node that commits first.
  6. For every receiving FinishMessage it put the transaction into committingTxs.
        1. Creates a copy (weakly-consistent) of IgniteTxManager#activeTx. Set listeners on those tx#finishFuture.
          1. For optimization it's safely exclude transactions that tx#status == ACTIVE. It's guaranteed that such transactions belongs After side.
        2. Creates a copy of removedActiveTxs (contains transactions that are might be cleaned from IgniteTxManager#activeTx). Set listeners on those tx#finishFuture.
        3. Set removedActiveTxs to null. We don't care of txs concurrently added to removedActiveTxs, they just don't land into "before" or "after" set and will be excluded from recovery.
      1. In transaction threads fills removedActiveTxs if ConsistentCut != null and removedActiveTxs != null:
        1. Every transaction is added into removedActiveTxsright before it is removed from IgniteTxManager#activeTx.
      2. For every listening transaction, the callback
    For every collected active transaction, it starts listening tx#finishFuture with callback. Callback
      1. is called when transaction finished:
        1. check
      that transaction completes in consistent way (tx#state != UNKNOWN, tx.finalizationStatus != RECOVERY_FINISH). If it isn't then this cut is inconsistent. Complete ConsistentCut exceptionally.
    1. if tx#txMarker is null or differs from local, then transaction put into before
        1. If transaction state is UNKNOWN or status is RECOVERY_FINISH, then complete ConsistentCut with exception.
        2. If transaction mapped to a higher topology version than ConsistentCut topVer, then put it into after.
        3. if tx#txCutId
      if tx#txMarker
        1. equals to local, then put transaction
      put into after
        1. into after, otherwise put into before.
      1. After every
    collected
      1. listening transaction finished:
        1. Writes a
      ConsistentCutFinishRecord 
        1. ConsistentCutFinishRecord  into WAL with
      the collections 
        1. the collections ( before, after ). 
      Stops filling committingTxs and clean this collection.
        1. Completes ConsistentCut  future
      , and notifies
        1. .
      1. Notify a node-initiator about
      finishing
      1. local procedure has finished (
      with
      1. by DistributedProcess
       
      1. protocol).
  7. After all nodes finished ConsistentCut :, on every node:
    1. Updates lastFinishedCutId with the current id.
    2. every node stops signing outgoing transaction messages
    3. ConsistentCut  future becomes future becomes null.
    4. Ignite node now in the initial state againStops signing outgoing transaction messages.
  8. Node initiator checks that every node completes correctly and that topVer wasn't changed since start.
    1. If any node complete exceptionally , or topology changed - complete IS Incremental Snapshot with exception.

Consistent and inconsistent Cuts

...

  1. any errors appeared during processing local Cut.
  2. if a transaction is recovered with transaction recovery protocol (tx.finalizationStatus == RECOVERY_FINISH).
  3. if transaction finished in UNKNOWN state.
  4. topology change

ConsistentCutMarker

Every ignite nodes tracks current ConsistentCutMarker :

Code Block
languagejava
titleConsistentCutMarker
class ConsistentCutMarker {
	UUID id;

	AffinityTopologyVersion topVer;
}

id is just a unique ConsistentCut  ID (is assigned on the node initiator).

topVer is topology version on node initiator before Incremental Snapshot starts.

Signing messages

On the picture below on left side is a diagram of sending transaction messages. Before sending message it checks whether cut is running with cutMarker(). If it is then wrap message, otherwise send ordinary message (PrepareRequest in example).

...

  1. .

Wrapping messages

Ignite transaction protocol includes multiple messages. But only some of them affects meaningful (relating to the algorithm) that change state of transactions (PREPARED, COMMITTED):

...

Those messages are wrapped in MarkerMessage ConsistentCutAwareMessage  that is prepared right before sending message on other node. They used the current ConsistentCutMarker for setting the marker.

Code Block
languagejava
titleConsistentCutMarkerMessage
class MarkerMessage {
	Message msg;

	ConsistentCutMarker marker;
}

ConsistentCutId. Also some messages require to be signed with combine with additional ConsistentCutMarker ConsistentCutId to check it them on primary/backup node.:

  1. GridNearTxFinishRequest / GridDhtTxFinishRequest
  2. GridNearTxPrepareResponse / GridDhtTxPrepareResponse (for 1PC algorithm).

Those messages are wrapped in TransactionFinishMarkerMessagefilled with txCutId  that is prepared right before transaction starts committing on first committing node. They used the current ConsistentCutMarker  for setting the txMarker . txMarker  can be null, if transaction starts committing before ConsistentCut starts. ConsistentCutId for this setting. If current ConsistentCutId is not null, then transaction starts committing after ConsistentCut started and it means that this transaction belongs the After side. 


Code Block
languagejava
titleConsistentCutAwareMessage
class ConsistentCutAwareMessage {
	/** Original transaction message. */
	Message msg;

	/** Consistent Cut ID. */
	UUID cutId;

 	/** Consistent Cut ID after which transaction committed. */
    @Nullable UUID txCutId;

	/** Cluster topology version on which Consistent Cut started. */
	long topVer;
}


Transaction

A new field added to IgniteInternalTx

Code Block
languagejava
titleIgniteInternalTx
class IgniteInternalTx {     
    /**
     * @param ID of {@link ConsistentCut} AFTER which this transaction was committed, {@code null} if transaction
     *           committed BEFORE.
     */
    public void cutId(@Nullable UUID id);
}

Consistent Cut Classes

Code Block
languagejava
titleConsistentCutMarkerFinishMessageConsistentCutManager
// Class is responsible for managing all stuff related to Consistent Cut. It's an entrypoint for transaction threads to check running consistent cut.
class ConsistentCutManager extends GridCacheSharedManagerAdapter {               
    // Current Consistent Cut. All transactions threads wraps outgoing messages if this field is not null.  */
    volatile @Nullable ConsistentCut cut;
	
	// Entrypoint for handling received new Consistent Cut ID.
	void handleConsistentCutId(UUID id);
}


Code Block
languagejava
titleConsistentCut
class ConsistentCut extends GridFutureAdapter<WALPointer> {          
	Set<GridCacheVersion> beforeCut;

    Set<GridCacheVersion> afterCut;

    Set<IgniteInternalFuture<IgniteInternalTx>> removedActiveclass TransactionFinishMarkerMessage extends MarkerMessage {
    @Nullable ConsistentCutMarker txMarker;
}
  1. ON DISTRIBUTED SNAPSHOTS, Ten H. LAI and Tao H. YANG, 29 May 1987