Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Choosing algorithm for transactional consistency

There are some possible solutions to guarantee transactional consistency:

  1. Consistent Cut
    + Simple algorithm - requires only some read-write locks to sync threads, and doesn't affect performance much.
    + Doesn't require to much additional space - it just writing few additional messages to WAL.
    - Requires time to recovery (applying every message from WAL to system) that depends on how many operations need to be restored.
    - Requires additional storage to persist WAL archives.
    - Doesn't restore ATOMIC caches.
  2. Incremental physical snapshots (collection of partition binary files changed since previous snapshot, can be implemented as delta or as full copy).
    - High disk IO usage for preparing snapshots.
    - Current implementation of snapshots requires PME, that affects performance much.
    + Fast recovering, doesn't depend on amount operations to restore (WAL-free).
  3. MVCC
    - Ignite failed to support MVCC, very hard to implement.

From those options Consistent Cut is more preferable. It may require optimizations for recovery, but it looks like we can provide some. For example, some options:

  1. Using WAL compaction for archived files, that excludes physical records from WAL files;
  2. Apply DataEntry from WAL in parallel by using striped executor (cache group id and partition id);
  3. Using index over WAL files for fast access to written Consistent Cuts.

Consistent Cut

Goal is to find Point on timeline on every node such that set of committed transactions before (after) this Point is the same on every node in a cluster.

Consider a simple model. On the pictures below there are two lines (P and Q). They represent two different Ignite nodes. Nodes exchange of transaction messages with each other. For simplicity let's consider there is the only state of tx (COMMITTED) and only single message related to one tx (one node notifies another about tx and latter immediately commits it after receiving the message).

Transaction can be described as ordered sequence of events. Correct order of events is known on every node. There are 4 ordered events to describe transaction in this simple model: 

  1. P.tx(COMMITTED): transaction committed on node P;
  2. P.snd(m): send message msg(tx5: P → Q);
  3. Q.rcv(m): receive message msg(tx5: P → Q);
  4. Q.tx(COMMITTED): transaction committed on node Q.

The cut line crosses both lines (P and Q) in the Points. Transactions (tx1, tx2, tx3, tx4) are on the left side of cut on both nodes. But tx5 is different because cut crosses also a message line. Then tx5 commits on different nodes in different states (before / after cut). To make a cut consistent the global order of transaction events must be kept - for any transaction if AFTER state contains some events from this order it means that BEFORE must contain all prior events.

Consider examples. First picture below describes case where tx5 state is:

...

Global state before cut contains events { Q.rcv(m), Q.tx(COMMITTED) } that aren't leading events in the known sequence - it misses first events { P.tx(COMMITTED), P.snd(m) }. But order of events must match order of consistent states (BEFORE, AFTER), then such a cut is inconsistent.  

Second picture below describes case where tx5 state is:

  • before cut: node P = { tx(COMMITTED), snd(m) }, node Q = { }.
  • after cut: node P = { }, node Q = { rcv(m), tx(COMMITTED) }.

Global state before cut contains events { P.tx(COMMITTED), P.snd(m) } that are leading events in the known sequence. Such a cut is consistent.

Image RemovedImage Removed

In case of consistent cut every node knows which side of cut a local transaction belongs to (BEFORE or AFTER). On the second picture, node Q after event rcv knows whether tx5 was included to cut on node P. Then if it was, node Q includes it, otherwise doesn't include. 

Algorithm

There is a paper [1] that proposes an algorithm for implementing distributed consistent cut. This algorithm works without a coordinator of the procedure, it is suitable for processes with no-FIFO channels between processes.

Algorithm's purpose is identify wrong order of messages by signing the messages with actual color (red or white). Then every node knows which side of the cut this message belongs to. 

Messages:

  1. sent - a set of messages that changed state of a sender.
  2. received - a set of messages that changed state of receiver.

Algorithm's steps:

  1. Initially all process are white, sent and received collections are empty, LocalState is empty.
  2. After some time of system work, every node might have:
    1. Optionally empty collections sent and received
    2. Optionally non-empty LocalState <-> sent + received. State match events that changed its state.
  3. Random process can start a snapshot (furthermore, multiple process may start it simultaneously):
    1. Node colors itself to red.
    2. It commits a LocalState.
    3. It commits sent and received as collections for every IN and OUT channel. New one created for next LocalState.
    4. It prepares a marker message: it is red, and has a payload of sent. Goal of the marker is to guarantee order of messages (receivedij must be a subset of sentji).
  4. Mark every ordinal message between distributed processes with the marker message, if no upcoming message to a node, then it just sends the marker as an ordinary message.
  5. On receiving the ordinal message a process has to check the marker at first, before applying the message;
  6. If receiving color differs from local color, node has to trigger the local snapshot procedure.
  7. Handle sent from the received marker:
    1. calculates ChannelState for the channel it received a message: sent - received; where sent extracts from the marker, received - calculates locally since local snapshot.
  8. On received marker messages from all IN channels, it prepares a snapshot:
    1. Local snapshot of node i: Ni = LocalStatei + Σ ChannelStateij (sent - received)
  9. Every such local snapshot is a unit of global snapshot:
    1. Note, that snapshot consist of committed LocalStates and messages between nodes.
    2. committed sent and received collections are cleaned.

Map the algorithm to Ignite

Ignite components

On receiving a message with new CutVersion node sets it and commits LocalState and ChannelState - to identify wrong order of the events

  1. LocalState maps to local WAL (all of committed transactions are part of LocalState);
  2. Channel:
    1. We can piggy back on Ignite transaction protocol (Prepare, Finish) messages with CommunicationSpi.
    2. In case there is no transaction for a channel, we can rely on the DiscoverySpi to start local snapshot on non-participated nodes.
  3. ChannelState maps to `IgniteTxManager#activeTransactions`:
    1. sent collection match committed transactions for which local node is near - they send FinishMessages to other nodes.
    2. received collection match committed transactions for which local node isn't near - they receive FinishMessages from other nodes. 
  4. `IgniteTxManager#activeTransactions` doesn't track:
    1. committing transactions (COMMITTING+), they are removed from this collection before start committing them.
      1. track them additionally: add to a separate collection before it starts committing, and remove after it committed.

Naive algorithm

  1. Initial state:
    1. Ignite nodes started from snapshot, or other consistent state (after graceful cluster stop / deactivation).
    2. var color = WHITE .
    3. two empty collections for sent and received - Map<UUID, Collection<GridCacheVersion>>, where key is a remote node ID, value is nearXidVersion of committed transaction.
  2. After some time, Ignite nodes might have non-empty send and received collections.
  3. Ignite coordinator starts a local snapshot (it all must be done atomically):
    1. mark local var with color = RED 
    2. write a snapshot record to WAL (commits LocalState).
    3. replace sent and received for new empty instances, old collections are immutable now for the snapshot process.
    4. prepare a marker message to every node with subset of send transactions (send just Collection<GridCacheVersion> for every node).
  4. Send marker messages directly to all nodes in a cluster. It must be sent with every next FinishMessage (to avoid cases, when next FinishMessage received before marker message).
  5. Other nodes on receiving a marker with new color:
    1. starts a local snapshot (see steps from point 3).
  6. For every received marker message:
    1. For prepared received collection and received marker's sent calculate: (sent - received). It transactions that are in transit state. They are also part of snapshot.
  7. After markers from all nodes received:
    1. Will store ChannelState into WAL: write Map<UUID, Collection<GridCacheVersion>>, where value is (sent - received).
    2. Snapshot is finished locally.
  8. For all nodes color = RED . Next snapshot iteration will started with changing color to WHITE . 

Disadvantages:

  1. Collections sent and received could be too big to track and send with single message.
  2. It requires a lock for steps 3 and 4, it affects of sending FinishMessage that affects UX (lower performance of transactions).
  3. It worst case it requires O(N^2) additional messages, when N is cluster size. It's possible when no active transactions are in cluster (idle nodes).

Reducing the sent and received collections:

We can rely on two-phase protocol of Ignite transactions to reduce sent and received:

  1. Transactions have additional state (PREPARED) before sending and receiving a FinishMessage. Then we can say that PREPARED is a promise for sending/receiving FinishMessage.
  2. Then IgniteTxManager#activeTransactions  already can replace sent and received collections:
    1. it tracks all PREPARED transactions (it is analogue of sent - received).
    2. Also, it automatically shrinks after transaction committed.

Avoid locking in steps 3 and 4

  1. Removing writing WAL record from lock => inconsistency between fixed sent / received and committed LocalState. 
  2. Removing fixing sent and received from lock => inconsistency between fixed sent / received and committed LocalState.

To avoid misses:

  • sent and received might be wider (collect transaction that not only PREPARED, but ACTIVE+).
  • auto-shrink must be disabled during a snapshot process.

To avoid unwanted transactions:

  • Introduce a new collection excluded that is a collection of messages that changed LocalState concurrently with local snapshot, but they aren't part of the snapshot ChannelState (but part of next ChannelState, after snapshot).
  • Then Ignite need a rule to distinct different ChannelStates (before and after snapshot). There are possible solutions:
    1. Proposed: add additional flag to FinishMessage that shows which side of snapshot this transaction belongs to on near node.
    2. compare by fixed GridCacheVersion (fixed on remote node during the snapshot process):
      1. but order of transactions is not guaranteed, then snapshot may include a transaction with version that is greater than a version of transaction after the snapshot.
      2. it's possible to overcome with additional step (locally collecting transactions for additional exclude after all transactions committed).

Also there is no need to send send collection with FinishMessage. Then whole lock is reduced to single volatile variable:

  1. Atomically (with Compare-And-Swap) update the color variable and disable auto-shrink for active transactions.
  2. Write a LocalState to WAL
  3. Collect all active transactions
  4. FinishMessage is sent with 2 fields: color (snp state) and commit color for the specified transaction.
  5. Introduce the exclude collection that contains of transactions to exclude from Snapshot before LocalState commit while restoring.

Reducing additional messages

As now useful info is just a color field (no need to send sent collection) then:

  1. for idle nodes it's cheaper to send a single message over discovery, complexity O(N), where N - cluster size.
  2. no need to await all marker messages from other nodes. Every node can just dump active transactions.

Final algorithm

For Ignite implementation it's proposed to use only single node to coordinate algorithm. User starts a command for creating new ConsistentCut on single node and this node becomes responsible for coordinating Consistent Cut iteration:

  1. Initial state:
    1. Ignite nodes started from snapshot, or other consistent state (after graceful cluster stop / deactivation).
    2. var color = WHITE .
    3. Empty collection committingTxs (Set<GridCacheVersion>) that goal is to track COMMITTING+ transactions, that aren't part of IgniteTxManager#activeTx . It's automatically shrinks after transaction committed.
  2. After some time, Ignite nodes might have non-empty committingTxs.
  3. Ignite node inites a global snapshot, by starting DistributedProcess (by discovery IO).
  4. Every nodes starts a local snapshot process after receiving a mark message (whether by discovery, or by communication with transaction message) 
    1. Atomically: set color = RED and disable auto-shrink of committingTxs.
    2. write a snapshot record to WAL (commits LocalState).
    3. Collect of active transactions - concat of IgniteTxManager#activeTx and committingTxs 
    4. While receiving Finish messages from other nodes, node fills ChannelState: exclude and (sent - received) collections. 
    5. After all transactions finished, it writes a WAL record with ChannelState.
  5. New color is sent with transaction Finish messages.
  6. Committing node add an additional color field for FinishMessage, that shows whether to include transaction to snapshot, or not. 
  7. Other nodes on receiving a marker with new color starts a local snapshot (see steps from point 3).
  8. Notifies a node-initiator about finishing local procedure (with DistributedProcess protocol).
  9. For all nodes color = RED . Next snapshot iteration will started with changing color to WHITE . 

Use node-local GridCacheVersion as mark

To avoid using mark message (color field) we can try rely on fixed GridCacheVersion. Algorithm is as follows:

  1. Initial state:
    1. Ignite nodes started from snapshot, or other consistent state (after graceful cluster stop / deactivation).
  2. Ignite node inites a global snapshot, by starting DistributedProcess (by discovery IO).
  3. Every (incl. client and non-baseline) node starts a local snapshot process after receiving a message from DistributedProcess. 
  4. Phase 1: 
    1. Write WAL record - commit LocalState.
    2. fix snpVersion  = GridCacheVersion#next(topVer)
    3. Collect all active transactions originated by near node, which nearXidVersion is less than snpVersion 
    4. Note, continue collecting transactions that are less than snpVersion , and for which local node is near (to exclude them later)
    5. after finishing all collected transactions: notify Ignite nodes with snpVersion (with DistributedProcess protocol).
  5. After all nodes finished first phase, they received Map<UUID, GridCacheVersion> from other nodes. 
  6. Phase 2: Only server baseline nodes continue work there:
    1. Collect all active transactions, find their near node (by GridCacheVersion#nodeOrderId), filter them with known GridCacheVersion
    2. Await all such transaction completed.
    3. Write WAL record with the received map.
  7. Phase 3: 
    1. Stop collecting near transactions that are less than local snpVersion and send them to other nodes.
    2. On receiving such map, write a new WAL record again, that contains additional skip collection.
  8. After finishing Phase 3, process of snapshot is finished.

Restoring process:

  1. Find WAL records for Phase 2, 3 - find map GridCacheVersions transactions to filter, and additional transactions xids to exclude (from Phase 3).
  2. Apply all records with the filters until record from Phase 2.

Disadvantages:

  1. Increments of GridCacheVersions is CAS operations from different threads. But the version is assigned to a transaction in non-atomic way. No guarantee that snpVersion  is greater than version of transaction created after fixing snpVersion. Ignite should track such transactions:
    1. With fair locking while creating and assigning version to transaction - possible performance degradation.
    2. With additional filter after preparing a snapshot (4.d).
  2. Client and non-baseline nodes has a job to do: collecting of transactions, awaiting them finished, sending a response. It could be non-reliable, as client nodes can be short-lived:
    1. Also should handle special cases when transaction is committed after client node gone and there is no info about it actual version.
  3. No safe previous record to restore, if some incremental snapshots created. Need to filter all history.

Consistent and inconsistent Cuts

Consistent Cut, in terms of Ignite implementation, is such cut that correctly finished on all baseline nodes - ConsistentCutStartRecord and ConsistentCutFinishRecord are written.

"Inconsistent" Cut is such a cut when one or more baseline nodes hasn't wrote ConsistentCutFinishRecord. It's possible in cases:

  1. any errors appeared during processing local Cut.
  2. if a transaction is recovered with transaction recovery protocol (txCutVer is unknown).
  3. if transaction finished in UNKNOWN state.
  4. baseline topology change, Ignite nodes finishes local Cuts running in this moment, making them inconsistent.

ConsistentCutVersion

Every ignite nodes tracks current ConsistentCutVersion:

Code Block
languagejava
titleConsistentCutVersion
class ConsistentCutVersion {
	long version;
}

`version` is a simple counter. It's guaranteed it is raising monotonically, due to it is incremented by discovery communication.

ConsistentCutColor initialization

ConsistentCutColor can be initialized with:

For changed server topology: 

  1. The color is received with GridDescoveryData from coordinator (for both client and server nodes).
  2. Every server node on receiving TcpDiscoveryNodeAddedMessage:
    1. finishes local ConsistentCut.

Order of transaction messages

In the 2PC protocol in most cases (except some optimization and corner cases) sequence of events for transaction can be described as set of messages (PrepareRequest, PrepareResponse, FinishRequest, FinishResponse) and set of TransactionState. For case with 2 nodes (P - near, Q - primary) it looks like that:  

P.tx(PREPARING) → P.snd(GridNearPrepareRequest) → Q.rcv(GridNearPrepareRequest) → Q.tx(PREPARED) → Q.snd(GridNearPrepareResponse) → P.rcv(GridNearPrepareResponse) → P.tx(PREPARED) → P.tx(COMMITTED) → P.snd(GridNearFinishRequest) →  Q.rcv(GridNearFinishRequest) → Q.tx(COMMITTED)

Image Removed

Important steps in this sequence:

  • near node commits before primary node (primary node commits before backup node)
  • there is a FinishRequest between between actual commits.

Consistent Cut procedure can start in any moment and cross this sequence in any chain. In this moment every node should decide which side of ConsistentCut every transaction belongs to. Some important notes:

  1. There are some issues with ACTIVE transactions:
    1. They can be long
    2. They may become SUSPENDED and hang ConsistentCut after that
  2. Then it's better to avoid checking such transactions, and algorithm helps it: for every transaction in ACTIVE state there are at least 2 messages to sync CutVersion between nodes (PREPARE response, FINISH request) - it's enough to provide a guarantee that every ACTIVE transaction will on the AFTER side.
  3. We skip SUSPENDED transactions to avoid a Consistent Cut hangs. The only valid way to reach SUSPENDED state is ACTIVE → SUSPENDED. Then no need additional checks there.
  4. For other transactions (>= PREPARING) algorithm listens their finish futures and checks txCutVer

Consider some examples below (P - near, Q - primary):

...


ConsistentCut is a distributed algorithm that splits WAL on 2 global areas - Before and After. It guarantees that every transaction committed Before also will be committed Before on every other node participated in the transaction. It means that an Ignite nodes can safely recover themself to the consistent Before state without any coordination with each other.

The border between Before and After areas consists of two WAL records - ConsistentCutStartRecord and ConsistentCutFinishRecordIt guarantees that the Before consists of:

  1. Transactions committed before ConsistentCutStartRecord AND weren't included into ConsistentCutFinishRecord#after().
  2. Transactions committed between ConsistentCutStartRecord and ConsistentCutFinishRecord AND were included into ConsistentCutFinishRecord#before().

On the picture below the Before area consist of transactions colored to yellow, while After is green.

Image Added

Code Block
languagejava
titleConsistentCutRecord
/** */
public class ConsistentCutStartRecord extends WALRecord {
	/** Consistent Cut ID. */
	private final UUID cutId;
}


/** */
public class ConsistentCutFinishRecord extends WALRecord {
	/** Consistent Cut ID. */
	private final UUID cutId;

    /**
     * Collections of transactions committed BEFORE.
     */
    private final Set<GridCacheVersion> before;

     /**
     * Collections of transactions committed AFTER.
     */
    private final Set<GridCacheVersion> after;
 }

Algorithm

Picture bellow illustrates steps of the algorithm on single node:

Image AddedImage Added

  1. Initial state:
    1. No concurrent ConsistentCut process is running.
    2. lastFinishedCutId holds previous ConsistentCutId, or null.
  2. User starts a command for creating new incremental snapshot:
    1. Ignite node inits a DistributedProcess with discovery message SnapshotOperationRequest that holds new ConsistentCutId (goal is to notify every node in a cluster about running incremental snapshot). 
    2. DistributedProcess store the topology version topVer on which ConsistentCut started.
  3. Process of creation of consistent cut can be started by two events (what will happen earlier):
    1. Receive the SnapshotOperationRequest#ConsistentCutId by DiscoverySPI (by the DistributedProcess).
    2. Receive the ConsistentCutAwareMessage#ConsistentCutId by CommunicationSPI (by transaction messages - Prepare, Finish).
  4. On receiving the ConsistentCutId it starts local ConsistentCut: 
    1. There are 2 roles that node might play:
      1. ROLE#1 - wraps outgoing messages - for all Ignite nodes: client, baseline, non-baseline server nodes.
      2. ROLE#2 - prepares data to be written in WAL - for baseline nodes only.
    2. Before start check:
      1. Whether ConsistentCut has already started (ConsistentCut != null) or finished (lastFinishedCutId == id) for this id, skip if it has.
      2. On non-baseline nodes In case ConsistentCut is inited by CommunicationSPI then compare the ConsistentCutAwareMessage#topVer with local node order:
        1. Local node order equals to new topVer on the moment when node joined to a cluster.
        2. If the order is higher than ConsistentCut topVer it means the node joined after ConsistentCut started. Skip start ConsistentCut on this node.
    3. ROLE#1:
      1. creates new ConsistentCut future.
        1. If node is  non-baseline (client, non-baseline servers) - complete it right after creation, and notify a node-initiator about local procedure has finished (by DistributedProcess protocol).
      2. While ConsistentCut != null wraps outgoing messages to ConsistentCutAwareMessage. It contains info:
        1. ConsistentCutId (to start ConsistentCut  on remote node, if not yet).
        2. Messages contain additional field txCutId. It is originally set on the nodes that commit first:
          1. For 2PC it is an originated node.
          2. For 1PC it is a backup node.
        3. If txCutId equals to null then transaction starts committing Before Consistent Cut started, otherwise After.
      3. On receive ConsistentCutAwareMessage that makes transaction committed (FinishRequest for 2PC, PrepareResponse for 1PC) sets tx#txCutId = message#txCutId.
    4. ROLE#2 - for baseline nodes only:
      1. In the message thread atomically inits ConsistentCut:
        1. creates new ConsistentCut future.
        2. creates empty collection removedActiveTxs (This collection doesn't remove transactions unlike IgniteTxManager#activeTx does).
      2. In the background thread:
        1. Writes a ConsistentCutStartRecord  to WAL with the received ConsistentCutId.
        2. Creates a copy (weakly-consistent) of IgniteTxManager#activeTx. Set listeners on those tx#finishFuture.
          1. For optimization it's safely exclude transactions that tx#status == ACTIVE. It's guaranteed that such transactions belongs After side.
        3. Creates a copy of removedActiveTxs (contains transactions that are might be cleaned from IgniteTxManager#activeTx). Set listeners on those tx#finishFuture.
        4. Set removedActiveTxs to null. We don't care of txs concurrently added to removedActiveTxs, they just don't land into "before" or "after" set and will be excluded from recovery.
      3. In transaction threads fills removedActiveTxs if ConsistentCut != null and removedActiveTxs != null:
        1. Every transaction is added into removedActiveTxsright before it is removed from IgniteTxManager#activeTx.
      4. For every listening transaction, the callback is called when transaction finished:
        1. check If transaction state is UNKNOWN or status is RECOVERY_FINISH, then complete ConsistentCut with exception.
        2. If transaction mapped to a higher topology version than ConsistentCut topVer, then put it into after.
        3. if tx#txCutId equals to local, then put transaction into after, otherwise put into before.
      5. After every listening transaction finished:
        1. Writes a ConsistentCutFinishRecord  into WAL with the collections ( before, after ). 
        2. Completes ConsistentCut  future.
      6. Notify a node-initiator about local procedure has finished (by DistributedProcess protocol).
  5. After all nodes finished ConsistentCut, on every node:
    1. Updates lastFinishedCutId with the current id.
    2. ConsistentCut  future becomes null.
    3. Stops signing outgoing transaction messages.
  6. Node initiator checks that every node completes correctly.
    1. If any node complete exceptionally - complete Incremental Snapshot with exception.

Consistent and inconsistent Cuts

Consistent Cut is such cut that correctly finished on all baseline nodes - ConsistentCutStartRecord  and ConsistentCutFinishRecord  are written.

"Inconsistent" Cut is such a cut when one or more baseline nodes hasn't wrote ConsistentCutFinishRecord . It's possible in cases:

  1. any errors appeared during processing local Cut.
  2. if a transaction is recovered with transaction recovery protocol (tx.finalizationStatus == RECOVERY_FINISH).
  3. if transaction finished in UNKNOWN state.
  4. topology change.

Wrapping messages

Ignite transaction protocol includes multiple messages. But only some of them affects meaningful (relating to the algorithm) that change state of transactions (PREPARED, COMMITTED):

  1. GridNearTxPrepareRequest / GridDhtTxPrepareRequest
  2. GridNearTxPrepareResponse / GridDhtTxPrepareResponse
  3. GridNearTxFinishRequest / GridDhtTxFinishRequest

Those messages are wrapped in ConsistentCutAwareMessage  that is prepared right before sending message on other node. They used the current ConsistentCutId. Also some messages require to be combine with additional ConsistentCutId to check it them on primary/backup node:

  1. GridNearTxFinishRequest / GridDhtTxFinishRequest
  2. GridNearTxPrepareResponse / GridDhtTxPrepareResponse (for 1PC algorithm).

Those messages are filled with txCutId  that is prepared right before transaction starts committing on first committing node. They used the current ConsistentCutId for this setting. If current ConsistentCutId is not null, then transaction starts committing after ConsistentCut started and it means that this transaction belongs the After side. 


Code Block
languagejava
titleConsistentCutAwareMessage
class ConsistentCutAwareMessage {
	/** Original transaction message. */
	Message msg;

	/** Consistent Cut ID. */
	UUID cutId;

 	/** Consistent Cut ID after which transaction committed. */
    @Nullable UUID txCutId;

	/** Cluster topology version on which Consistent Cut started. */
	long topVer;
}


Transaction

A new field added to IgniteInternalTx

Code Block
languagejava
titleIgniteInternalTx
class IgniteInternalTx {     
    /**
     * @param ID of {@link ConsistentCut} AFTER which this transaction was committed, {@code null} if transaction
     *           committed BEFORE.
     */
    public void cutId(@Nullable UUID id);
}

Consistent Cut Classes

Code Block
languagejava
titleConsistentCutManager
// Class is responsible for managing all stuff related to Consistent Cut. It's an entrypoint for transaction threads to check running consistent cut.
class ConsistentCutManager extends GridCacheSharedManagerAdapter {               
    // Current Consistent Cut. All transactions threads wraps outgoing messages if this field is not null.  */
    volatile @Nullable ConsistentCut cut;
	
	// Entrypoint for handling received new Consistent Cut ID.
	void handleConsistentCutId(UUID id);
}


Code Block
languagejava
titleConsistentCut
class ConsistentCut extends GridFutureAdapter<WALPointer> {          
	Set<GridCacheVersion> beforeCut;

    Set<GridCacheVersion> afterCut;

    Set<IgniteInternalFuture<IgniteInternalTx>> removedActive;
}

...

  1. P: cut before rcv(GridNearPrepareResponse), then tx isn't PREPARED yet. Then P signs FinishRequest with new CutVersion.
  2. Q collects this tx in moment of cut (as locally it's PREPARED) and waiting for FinishRequest. On receiving it checks message: local CutVer == rcvd txCutVer, then belongs AFTER.

Image RemovedImage Removed

Pictures below describes a little bit complicate example: P commits tx AFTER, Q commits tx BEFORE. But actually Consistent Cut algorithm doesn't allow such case, because this cut is inconsistent. Global state of tx includes events:  { Q.snd(GridNearPrepareResponse), Q.rcv(GridNearFinishRequest), Q.tx(COMMITTED) }. But this state misses some middle events { P.rcv(GridNearPrepareResponse), P.tx(COMMITTED), P.snd(GridNearFinishRequest) }.

Then this cut is inconsistent. To avoid such cases, FinishRequest is signed with latestCutVer, and node Q must to validate it and trigger Consistent Cut before applying the message. And after that this case equals to case from previous pictures.

  1. P: cut before P.rcv(GridNearPrepareResponse), then tx isn't PREPARED yet. Then P signs FinishRequest with new CutVersion.
  2. Q: by receiving FinishRequest it triggers Consistent Cut, collect PREPARED tx in this moment, and after that check version from FinishRequest.
  3. Case equals to that on the second picture above. Globally tx AFTER cut.

Image RemovedImage Removed

One-Phase commit handling

Sequence of events for 1PC differs:

P.tx(PREPARING) → P.snd(GridNearPrepareRequest) → P.rcv(GridNearPrepareRequest) → Q.tx(PREPARED) → Q.tx(COMMITTED) → Q.snd(GridNearPrepareResponse) → P.rcv(GridNearPrepareResponse) → P.tx(PREPARED) → P.tx(COMMITTED)

Important points in this sequence

  • backup node commits before primary node
  • there is a GridNearPrepareResponse between between actual commits 

Then for 1PC the backup (or primary, if backups=0) is responsible for signing tx with CutVersion, not near node. CutVersion propogates between nodes in reverse order: near ← primary ← backup.

Signing messages

Ignite transaction protocol includes multiple messages. But only some of them affects meaningful (relating to the algorithm) that change state of transactions (PREPARED, COMMITTED):

  1. GridNearTxPrepareRequest / GridDhtTxPrepareRequest
  2. GridNearTxPrepareResponse / GridDhtTxPrepareResponse
  3. GridNearTxFinishRequest / GridDhtTxFinishRequest

Also some messages require to be signed with tx color to check it them on primary/backup node:

  1. GridNearTxFinishRequest / GridDhtTxFinishRequest
  2. GridNearTxPrepareResponse / GridDhtTxPrepareResponse (for 1PC algorithm).

Unstable topology

There are some cases to handle for unstable topology:

  1. Client or non-baseline server node leaves – no need to handle.
  2. Server node leaves:
    1. all nodes finish local running ConsistentCut, making them in-consistent
  3. Server node joins:
    1. all nodes finish local running ConsistentCut, making them in-consistent
    2. new node checks whether rebalance was required for recovering. If it is required, then handle it TBD

TBD: Which ways to use to avoid inconsistency between data and WAL after rebalance. There are options:

...

  1. During restore read this record and repeat the historical rebalance at this point, after rebalance resume recovery with existing WALs.
  2. In case Record contains full rebalance - stops recovering with WAL and fallback to full rebalance.
    ? Is it possible to rebalance only specific cache groups, and continue to WAL recovery for others.
    - For historical rebalance during recovery need separate logic for extracting records from WAL archives from other nodes. 

WAL records

There are 2 records: `ConsistentCutStartRecord` for Start event and `ConsistentCutFinishRecord` for Finish event. 

  • ConsistentCutStartRecord: record is written to WAL in moment when CC starts on a local node. It helps to limit amout of active transactions to check. But there is no strict guarantee for all transactions belonged to the BEFORE side to be physically committed before ConsistentCutStartRecord, and vice versa. This is the reason for having ConsistentCutFinishRecord.
  • ConsistentCutFinishRecord: This record is written to WAL after Consistent Cut stopped analyzing transactions and storing them in a particular bucket (BEFORE or AFTER).

...

Code Block
languagejava
titleConsistentCutRecord
/** */
public class ConsistentCutStartRecord extends WALRecord {
	/** Timestamp of starting global Consistent Cut. */
	private final long ts;
}


/** */
public class ConsistentCutFinishRecord extends WALRecord {
    /**
     * Collections of TXs committed BEFORE the ConsistentCut (sent - received).
     */
    private final Set<GridCacheVersion> before;

     /**
     * Collections of TXs committed AFTER the ConsistentCut (exclude).
     */
    private final Set<GridCacheVersion> after;
 }
  1. ON DISTRIBUTED SNAPSHOTS, Ten H. LAI and Tao H. YANG, 29 May 1987