Table of Contents
There are some possible solutions to guarantee transactional consistency:
From those options Consistent Cut is more preferable. It may require optimizations for recovery, but it looks like we can provide some. For example, some options:
Goal is to find Point on timeline on every node such that set of committed transactions before (after) this Point is the same on every node in a cluster.
Consider a simple model. On the pictures below there are two lines (P and Q). They represent two different Ignite nodes. Nodes exchange of transaction messages with each other. For simplicity let's consider there is the only state of tx (COMMITTED) and only single message related to one tx (one node notifies another about tx and latter immediately commits it after receiving the message).
Transaction can be described as ordered sequence of events. Correct order of events is known on every node. There are 4 ordered events to describe transaction in this simple model:
The cut line crosses both lines (P and Q) in the Points. Transactions (tx1, tx2, tx3, tx4) are on the left side of cut on both nodes. But tx5 is different because cut crosses also a message line. Then tx5 commits on different nodes in different states (before / after cut). To make a cut consistent the global order of transaction events must be kept - for any transaction if AFTER state contains some events from this order it means that BEFORE must contain all prior events.
Consider examples. First picture below describes case where tx5 state is:
...
ConsistentCut is a distributed algorithm that splits WAL on 2 global areas - Before
and After
. It guarantees that every transaction committed Before
also will be committed Before
on every other node participated in the transaction. It means that an Ignite nodes can safely recover themself to the consistent Before
state without any coordination with each other.
The border between Before
and After
areas consists of two WAL records - ConsistentCutStartRecord
and ConsistentCutFinishRecord
. It guarantees that the Before
consists of:
ConsistentCutStartRecord
AND weren't included into ConsistentCutFinishRecord#after().
ConsistentCutStartRecord
and ConsistentCutFinishRecord
AND were included into ConsistentCutFinishRecord#before()
.On the picture below the Before
area consist of transactions colored to yellow, while After
is green.
Code Block | ||||
---|---|---|---|---|
| ||||
/** */
public class ConsistentCutStartRecord extends WALRecord {
/** Consistent Cut ID. */
private final UUID cutId;
}
/** */
public class ConsistentCutFinishRecord extends WALRecord {
/** Consistent Cut ID. */
private final UUID cutId;
/**
* Collections of transactions committed BEFORE.
*/
private final Set<GridCacheVersion> before;
/**
* Collections of transactions committed AFTER.
*/
private final Set<GridCacheVersion> after;
}
|
Picture bellow illustrates steps of the algorithm on single node:
lastFinishedCutId
holds previous ConsistentCutId
, or null.SnapshotOperationRequest
that holds new ConsistentCutId
(goal is to notify every node in a cluster about running incremental snapshot). SnapshotOperationRequest#ConsistentCutId
by DiscoverySPI (by the DistributedProcess).ConsistentCutAwareMessage#ConsistentCutId
by CommunicationSPI (by transaction messages - Prepare, Finish).ConsistentCutId
it starts local ConsistentCut: ConsistentCut
!= null) or finished (lastFinishedCutId
== id) for this id, skip if it has.ConsistentCutAwareMessage#topVer
with local node order:ConsistentCut
future. ConsistentCut != null
wraps outgoing messages to ConsistentCutAwareMessage
. It contains info:ConsistentCutId
(to start ConsistentCut on remote node, if not yet).txCutId
equals to null then transaction starts committing Before
Consistent Cut started, otherwise After
.ConsistentCutAwareMessage
that makes transaction committed (FinishRequest for 2PC, PrepareResponse for 1PC) sets tx#txCutId = message#txCutId
.ConsistentCut
future.removedActiveTxs
(This collection doesn't remove transactions unlike IgniteTxManager#activeTx
does).ConsistentCutStartRecord
to WAL with the received ConsistentCutId
.IgniteTxManager#activeTx
. Set listeners on those tx#finishFuture
.tx#status == ACTIVE
. It's guaranteed that such transactions belongs After side.removedActiveTxs
(contains transactions that are might be cleaned from IgniteTxManager#activeTx
). Set listeners on those tx#finishFuture
.removedActiveTxs
to null. We don't care of txs concurrently added to removedActiveTxs
, they just don't land into "before" or "after" set and will be excluded from recovery.removedActiveTxs
if ConsistentCut != null
and removedActiveTxs != null
:removedActiveTxs
right before it is removed from IgniteTxManager#activeTx
.tx#txCutId
equals to local, then put transaction into after, otherwise put into before.ConsistentCutFinishRecord
into WAL with the collections ( before, after ). ConsistentCut
future.ConsistentCut
future becomes null.Consistent Cut
Global state before cut contains events { Q.rcv(m), Q.tx(COMMITTED) } that aren't leading events in the known sequence - it misses first events { P.tx(COMMITTED), P.snd(m) }. But order of events must match order of consistent states (BEFORE, AFTER), then such a cut is inconsistent.
Second picture below describes case where tx5 state is:
Global state before cut contains events { P.tx(COMMITTED), P.snd(m) } that are leading events in the known sequence. Such a cut is consistent.
In case of consistent cut every node knows which side of cut a local transaction belongs to (BEFORE or AFTER). On the second picture, node Q after event rcv knows whether tx5 was included to cut on node P. Then if it was, node Q includes it, otherwise doesn't include.
There is a paper [1] that proposes an algorithm for implementing distributed consistent cut. This algorithm works without a coordinator of the procedure, it is suitable for processes with no-FIFO channels between processes.
Algorithm's purpose is identify wrong order of messages by signing the messages with actual color (red or white). Then every node knows which side of the cut this transaction belongs to.
Algorithm's steps:
For Ignite implementation it's proposed to use only single node to coordinate algorithm. User starts a command for creating new ConsistentCut on single node and this node becomes responsible for coordinating Consistent Cut iteration:
Consistent Cut, in terms of Ignite implementation, is such cut that correctly finished on all baseline nodes - ConsistentCutStartRecord
and ConsistentCutFinishRecord
are written.
"Inconsistent" Cut is such a cut when one or more baseline nodes hasn't wrote ConsistentCutFinishRecord
. It's possible in cases:
tx.finalizationStatus
== RECOVERY_FINISH).Every ignite nodes tracks current ConsistentCutVersion:
Code Block | ||||
---|---|---|---|---|
| ||||
class ConsistentCutVersion {
long version;
} |
`version` is a simple counter. It's guaranteed it is raising monotonically, due to it is incremented by discovery communication.
ConsistentCutVersion can be initialized with:
For changed server topology:
MetaStorage
for pre-stored latest ConsistentCutVersion, default is 0.JoiningNodeDescoveryData
and sent to coordinator.For client node on start-up just sets its version to 0. It automatically updates with cluster version by receiving next GridNearTxPrepareResponse message.
After finished restoring (PITR) the version is used for restoring is used and set after reading WAL archives.
In the 2PC protocol in most cases (except some optimization and corner cases) sequence of events for transaction can be described as set of messages (PrepareRequest, PrepareResponse, FinishRequest, FinishResponse) and set of TransactionState. For case with 2 nodes (P - near, Q - primary) it looks like that:
P.tx(PREPARING) → P.snd(GridNearPrepareRequest) → Q.rcv(GridNearPrepareRequest) → Q.tx(PREPARED) → Q.snd(GridNearPrepareResponse) → P.rcv(GridNearPrepareResponse) → P.tx(PREPARED) → P.tx(COMMITTED) → P.snd(GridNearFinishRequest) → Q.rcv(GridNearFinishRequest) → Q.tx(COMMITTED)
Important steps in this sequence:
Consistent Cut procedure can start in any moment and cross this sequence in any chain. In this moment every node should decide which side of ConsistentCut every transaction belongs to. Some important notes:
Consider some examples below (P - near, Q - primary):
...
Pictures below describes a little bit complicate example: P commits tx AFTER, Q commits tx BEFORE. But actually Consistent Cut algorithm doesn't allow such case, because this cut is inconsistent. Global state of tx includes events: { Q.snd(GridNearPrepareResponse), Q.rcv(GridNearFinishRequest), Q.tx(COMMITTED) }. But this state misses some middle events { P.rcv(GridNearPrepareResponse), P.tx(COMMITTED), P.snd(GridNearFinishRequest) }.
Then this cut is inconsistent. To avoid such cases, FinishRequest is signed with latestCutVer, and node Q must to validate it and trigger Consistent Cut before applying the message. And after that this case equals to case from previous pictures.
Sequence of events for 1PC differs:
P.tx(PREPARING) → P.snd(GridNearPrepareRequest) → P.rcv(GridNearPrepareRequest) → Q.tx(PREPARED) → Q.tx(COMMITTED) → Q.snd(GridNearPrepareResponse) → P.rcv(GridNearPrepareResponse) → P.tx(PREPARED) → P.tx(COMMITTED)
Important points in this sequence
Then for 1PC the backup (or primary, if backups=0) is responsible for signing tx with CutVersion, not near node. CutVersion propogates between nodes in reverse order: near ← primary ← backup.
Ignite transaction protocol includes multiple messages. But only some of them affects meaningful (relating to the algorithm) that change state of transactions (PREPARED, COMMITTED):
GridNearTxPrepareRequest / GridDhtTxPrepareRequest
GridNearTxPrepareResponse / GridDhtTxPrepareResponse
GridNearTxFinishRequest / GridDhtTxFinishRequest
Those messages are wrapped in ConsistentCutAwareMessage
that is prepared right before sending message on other node. They used the current ConsistentCutId
.
Also some messages require to be signed with tx CutVersioncombine with additional ConsistentCutId
to check it them on primary/backup node:
GridNearTxFinishRequest / GridDhtTxFinishRequest
GridNearTxPrepareResponse / GridDhtTxPrepareResponse
(for 1PC algorithm).On receiving a message with new CutVersion node sets it and commits LocalState and ChannelState - to identify wrong order of the events.
ConsistentCutManager
.There are some cases to handle for unstable topology:
TBD: Which ways to use to avoid inconsistency between data and WAL after rebalance. There are options:
...
There are 2 records: `ConsistentCutStartRecord` for Start event and `ConsistentCutFinishRecord` for Finish event.
...
Those messages are filled with txCutId
that is prepared right before transaction starts committing on first committing node. They used the current ConsistentCutId
for this setting. If current ConsistentCutId
is not null, then transaction starts committing after ConsistentCut started and it means that this transaction belongs the After
side.
Code Block | |||||
---|---|---|---|---|---|
| |||||
class ConsistentCutAwareMessage { /** Original transaction message. */ public class ConsistentCutStartRecord extends WALRecord { /** ConsistentCutVersion, counter on Ignite coordinator Message msg; /** Consistent Cut ID. */ UUID cutId; /** Consistent Cut ID after which transaction committed. */ private final@Nullable ConsistentCutVersionUUID cutVertxCutId; /** Approximated timestamp of cut stateCluster topology version on which Consistent Cut started. */ private final long tstopVer; } |
A new field added to IgniteInternalTx
Code Block | ||||
---|---|---|---|---|
| ||||
class IgniteInternalTx { /** /** * @param ID of {@link ConsistentCut} AFTER which this transaction was committed, {@code null} if transaction * committed BEFORE. */ public void class ConsistentCutFinishRecordcutId(@Nullable UUID id); } |
Code Block | ||||
---|---|---|---|---|
| ||||
// Class is responsible for managing all stuff related to Consistent Cut. It's an entrypoint for transaction threads to check running consistent cut. class ConsistentCutManager extends GridCacheSharedManagerAdapter { extends WALRecord { // Current Consistent Cut. All transactions threads wraps outgoing messages if this field is not null. */ volatile @Nullable ConsistentCut cut; // Entrypoint for handling received new Consistent Cut ID. void handleConsistentCutId(UUID id); } |
Code Block | ||||
---|---|---|---|---|
| ||||
class ConsistentCut extends GridFutureAdapter<WALPointer> { Set<GridCacheVersion> beforeCut; Set<GridCacheVersion> afterCut; Set<IgniteInternalFuture<IgniteInternalTx>> removedActive; }/** * Collections of TXs committed BEFORE the ConsistentCut. */ private final Set<GridCacheVersion> before; /** * Collections of TXs committed AFTER the ConsistentCut. */ private final Set<GridCacheVersion> after; } |