Definitions
Paper [1] defines ConsistentCut is a distributed snapshot algorithm. It uses some definitions, let's describe them in terms of Ignite:
- LocalState - set of committed transactions prior to a moment.
- In Ignite LocalState is committed by writing ConsistentCutStartRecord into WAL.
- Message - transaction message, it is sent after changing LocalState on sending node, and it changes LocalState on receiving node after delivering
- In Ignite such messages are FinishRequest for 2PC, PrepareResponse for 1PC.
- Channel - unidirectional connection between two Ignite nodes, where messages is sent (Communication SPI).
- ChannelState - for single channel it's a set of messages that was sent (changed LocalState of sending node), but not received yet (hasn't changed LocalState of receiving node).
- In Ignite we can think that incoming ChannelState is represented by active transactions in PREPARING+ state (the Message might be sent by other nodes, but isn't received yet).
- GlobalSnapshot - set consist of (LocalState and ChannelState for every incoming channel) for every node.
- In Ignite the snapshot is represented with 2 WAL records (ConsistentCutStartRecord commits the LocalState, ConsistentCutFinishRecord describes the ChannelState).
- Marker - mark that piggy backs on the Message, and notifies a node about running snapshot.
- In Ignite ConsistentCutMarkerMessage is used.
In terms of Ignite, there are additional definitions:
- Consistent Cut - Successful attempt of creating Global Snapshot.
- Inconsistent Cut - Failed attempt of creating a Global Snapshot, due to inability to correctly describe a ChannelState.
Algorithm
...
algorithm that splits WAL on 2 global areas - Before
and After
. It guarantees that every transaction committed Before
also will be committed Before
on every other node participated in the transaction. It means that an Ignite nodes can safely recover themself to the consistent Before
state without any coordination with each other.
The border between Before
and After
areas consists of two WAL records - ConsistentCutStartRecord
and ConsistentCutFinishRecord
. It guarantees that the Before
consists of:
- Transactions committed before
ConsistentCutStartRecord
AND weren't included into ConsistentCutFinishRecord#after().
- Transactions committed between
ConsistentCutStartRecord
and ConsistentCutFinishRecord
AND were included into ConsistentCutFinishRecord#before()
.
On the picture below the Before
area consist of transactions colored to yellow, while After
is green.
Image Added
Code Block |
---|
language | java |
---|
title | ConsistentCutRecord |
---|
|
/** */
public class ConsistentCutStartRecord extends WALRecord {
/** Consistent Cut ID. */
private final UUID cutId;
}
/** */
public class ConsistentCutFinishRecord extends WALRecord {
/** Consistent Cut ID. */
private final UUID cutId;
/**
* Collections of transactions committed BEFORE.
*/
private final Set<GridCacheVersion> before;
/**
* Collections of transactions committed AFTER.
*/
private final Set<GridCacheVersion> after;
}
|
Algorithm
Picture bellow illustrates steps of the algorithm on single node:
Image Added
Image Added
...
- Initial state:
- No concurrent ConsistentCut process is running.
lastFinishedCutId
holds previous ConsistentCutId
, or null.
- User starts a command for creating new incremental snapshot
...
- :
- Initial state:
- Ignite WAL are in consistent state relatively to previous full or incremental snapshot.
- Every Ignite node has local ConsistentCut future equals to
null (node is WHITE)
. - Empty collection committingTxs (Set<GridCacheVersion>) that goal is to track COMMITTING+ transactions, that aren't part of
IgniteTxManager#activeTx
.
- Ignite node inites a GlobalSnapshot, by starting DistributedProcess (by discovery IO):
- creates a new ConsistentCutMarker.
- prepares a marker message that contains the marker and transmits this message to other nodes.
- Ignite node inits a DistributedProcess with discovery message
SnapshotOperationRequest
that holds new ConsistentCutId
(goal is to notify every node in a cluster about running incremental snapshot). - DistributedProcess store the topology version topVer on which ConsistentCut started.
- Process of creation of consistent cut can be started by two events (what will happen earlier):
- Receive the
SnapshotOperationRequest#ConsistentCutId
by DiscoverySPI (by the DistributedProcess). - Receive the
ConsistentCutAwareMessage#ConsistentCutId
by CommunicationSPI (by transaction messages - Prepare, Finish).
- On receiving the
ConsistentCutId
it starts local ConsistentCut: - There are 2 roles that node might play:
- ROLE#1 - wraps outgoing messages - for all Ignite nodes: client, baseline, non-baseline server nodes.
- ROLE#2 - prepares data to be written in WAL - for baseline nodes only.
- Before start check:
- Whether ConsistentCut has already started (
ConsistentCut
!= null) or finished (lastFinishedCutId
== id) for this id, skip if it has. - On non-baseline nodes In case ConsistentCut is inited by CommunicationSPI then compare the
ConsistentCutAwareMessage#topVer
with local node order:- Local node order equals to new topVer on the moment when node joined to a cluster.
- If the order is higher than ConsistentCut topVer it means the node joined after ConsistentCut started. Skip start ConsistentCut on this node.
- ROLE#1:
- creates new
ConsistentCut
future. - If node is non-baseline (client, non-baseline servers) - complete it right after creation, and notify a node-initiator about local procedure has finished (by DistributedProcess protocol).
- While
ConsistentCut != null
wraps outgoing messages to ConsistentCutAwareMessage
. It contains info:
ConsistentCutId
(to start ConsistentCut
Every nodes starts a local snapshot process after receiving the marker message (whether by discovery, or by communication with transaction message) - Atomically: creates new ConsistentCut future (node becomes RED), creates committingTxs, starts signing outgoing messages with the ConsistentCutMarker.
- Write a snapshot record to WAL with the received ConsistentCutMarker (commits LocalState).
- Collect of active transactions - concat of
IgniteTxManager#activeTx
and committingTxs Prepares 2 empty collections - before andafter cut (describes ChannelState).While global Consistent Cut is running every node signs output transaction messages:- Prepare messages signed with the ConsistentCutMarker (to trigger ConsistentCut
- on remote node, if not yet).
- Finish messages signed with the ConsistentCutMarker (to trigger...) and transaction ConsistentCutMarker (to notify nodes which side of cut this transaction belongs to).
- Finish messages is signed on node that commits first (near node for 2PC, backup or primary for 1PC).
- For every collected active transaction, node waits for Finish message, to extract the ConsistentCutMarker and fills before, after collections:
- if received marker is null or differs from local, then transaction on before side
- if received color equals to local, then transaction on after side
- Messages contain additional field txCutId. It is originally set on the nodes that commit first:
- For 2PC it is an originated node.
- For 1PC it is a backup node.
- If
txCutId
equals to null then transaction starts committing Before
Consistent Cut started, otherwise After
.
- On receive
ConsistentCutAwareMessage
that makes transaction committed (FinishRequest for 2PC, PrepareResponse for 1PC) sets tx#txCutId = message#txCutId
.
- ROLE#2 - for baseline nodes only:
- In the message thread atomically inits ConsistentCut:
- creates new
ConsistentCut
future. - creates empty collection
removedActiveTxs
(This collection doesn't remove transactions unlike IgniteTxManager#activeTx
does).
- In the background thread:
- Writes a
ConsistentCutStartRecord
to WAL with the received ConsistentCutId
. - Creates a copy (weakly-consistent) of
IgniteTxManager#activeTx
. Set listeners on those tx#finishFuture
.- For optimization it's safely exclude transactions that
tx#status == ACTIVE
. It's guaranteed that such transactions belongs After side.
- Creates a copy of
removedActiveTxs
(contains transactions that are might be cleaned from IgniteTxManager#activeTx
). Set listeners on those tx#finishFuture
. - Set
removedActiveTxs
to null. We don't care of txs concurrently added to removedActiveTxs
, they just don't land into "before" or "after" set and will be excluded from recovery.
- In transaction threads fills
removedActiveTxs
if ConsistentCut != null
and removedActiveTxs != null
:- Every transaction is added into
removedActiveTxs
right before it is removed from IgniteTxManager#activeTx
.
- For every listening transaction, the callback is called when transaction finished:
- check If transaction state is UNKNOWN or status is RECOVERY_FINISH, then complete ConsistentCut with exception.
- If transaction mapped to a higher topology version than ConsistentCut topVer, then put it into after.
- if
tx#txCutId
equals to local, then put transaction into after, otherwise put into before.
- After every listening transaction finished:
- Writes a
ConsistentCutFinishRecord
into WAL with the collections
After all transactions finished:- Writes a finish WAL record with ChannelState
- ( before, after ).
Stops filling committingTxs.- Completes
ConsistentCut
future, and notifies - future.
- Notify a node-initiator about
finishing - local procedure has finished (
with - by DistributedProcess protocol).
- After all nodes finished ConsistentCut, on every node stops :
- Updates lastFinishedCutId with the current id.
ConsistentCut
future becomes null.- Stops signing outgoing transaction messages
- ConsistentCut future becomes null (node is WHITE again)- .
- Node initiator checks that every node completes correctly.
- If any node complete exceptionally - complete Incremental Snapshot with exception.
Consistent and inconsistent Cuts
Consistent Cut , in terms of Ignite implementation, is such cut that correctly finished on all baseline nodes - ConsistentCutStartRecord
and ConsistentCutFinishRecord
are written.
"Inconsistent" Cut is such a cut when one or more baseline nodes hasn't wrote ConsistentCutFinishRecord
. It's possible in cases:
- any errors appeared during processing local Cut.
- if a transaction is recovered with transaction recovery protocol (
tx.finalizationStatus
== RECOVERY_FINISH). - if transaction finished in UNKNOWN state.
- baseline topology change, Ignite nodes finishes local Cuts running in this moment, making them inconsistent.
ConsistentCutMarker
Every ignite nodes tracks current ConsistentCutMarker:
Code Block |
---|
language | java |
---|
title | ConsistentCutVersion |
---|
|
class ConsistentCutMarker {
UUID id;
} |
`id` is just a unique ConsistentCut ID (is assigned on the node initiator).
- .
Wrapping
...
messages
Ignite transaction protocol includes multiple messages. But only some of them affects meaningful (relating to the algorithm) that change state of transactions (PREPARED, COMMITTED):
GridNearTxPrepareRequest / GridDhtTxPrepareRequest
GridNearTxPrepareResponse / GridDhtTxPrepareResponse
GridNearTxFinishRequest / GridDhtTxFinishRequest
Those messages are wrapped in ConsistentCutAwareMessage
that is prepared right before sending message on other node. They used the current ConsistentCutId
.
Also some messages require to be signed with tx colorcombine with additional ConsistentCutId
to check it them on primary/backup node:
GridNearTxFinishRequest / GridDhtTxFinishRequest
GridNearTxPrepareResponse / GridDhtTxPrepareResponse
(for 1PC algorithm).
WAL records
There are 2 records: `ConsistentCutStartRecord` for Start event and `ConsistentCutFinishRecord` for Finish event.
- ConsistentCutStartRecord: record is written to WAL in moment when CC starts on a local node. It helps to limit amout of active transactions to check. But there is no strict guarantee for all transactions belonged to the BEFORE side to be physically committed before ConsistentCutStartRecord, and vice versa. This is the reason for having ConsistentCutFinishRecord.
- ConsistentCutFinishRecord: This record is written to WAL after Consistent Cut stopped analyzing transactions and storing them in a particular bucket (BEFORE or AFTER).
...
Those messages are filled with txCutId
that is prepared right before transaction starts committing on first committing node. They used the current ConsistentCutId
for this setting. If current ConsistentCutId
is not null, then transaction starts committing after ConsistentCut started and it means that this transaction belongs the After
side.
Code Block |
---|
language | java |
---|
title | ConsistentCutRecord | ConsistentCutAwareMessage |
---|
|
class ConsistentCutAwareMessage {
/** Original transaction message. */
public class ConsistentCutStartRecord extends WALRecord {
Message msg;
/** Consistent Cut ID. */
UUID cutId;
/** Consistent MarkerCut ID thatafter initswhich Consistenttransaction Cutcommitted. */
private final ConsistentCutMarker marker;
}
@Nullable UUID txCutId;
/** Cluster topology version on which Consistent Cut started. */
long topVer;
} |
Transaction
A new field added to IgniteInternalTx
Code Block |
---|
language | java |
---|
title | IgniteInternalTx |
---|
|
class IgniteInternalTx { public class ConsistentCutFinishRecord extends WALRecord {
/**
* Collections@param ID of TXs{@link committedConsistentCut} BEFOREAFTER thewhich ConsistentCutthis (senttransaction - received).
*/
private final Set<GridCacheVersion> before;
was committed, {@code null} if transaction
/**
* Collections of TXs committed AFTER the ConsistentCut (exclude).
* committed BEFORE.
*/
private final Set<GridCacheVersion> after;
}
|
Unstable topology
There are some cases to handle for unstable topology:
- Client or non-baseline server node leaves – no need to handle.
- Server node leaves:
- all nodes finish local running ConsistentCut, making them in-consistent
- Server node joins:
- all nodes finish local running ConsistentCut, making them in-consistent
- new node checks whether rebalance was required for recovering. If it is required, then handle it TBD
TBD: Which ways to use to avoid inconsistency between data and WAL after rebalance. There are options:
...
- During restore read this record and repeat the historical rebalance at this point, after rebalance resume recovery with existing WALs.
- In case Record contains full rebalance - stops recovering with WAL and fallback to full rebalance.
? Is it possible to rebalance only specific cache groups, and continue to WAL recovery for others.
- For historical rebalance during recovery need separate logic for extracting records from WAL archives from other nodes.
Choosing algorithm for transactional consistency
There are some possible solutions to guarantee transactional consistency:
- Consistent Cut
+ Simple algorithm - requires only some read-write locks to sync threads, and doesn't affect performance much.
+ Doesn't require to much additional space - it just writing few additional messages to WAL.
- Requires time to recovery (applying every message from WAL to system) that depends on how many operations need to be restored.
- Requires additional storage to persist WAL archives.
- Doesn't restore ATOMIC caches. - Incremental physical snapshots (collection of partition binary files changed since previous snapshot, can be implemented as delta or as full copy).
- High disk IO usage for preparing snapshots.
- Current implementation of snapshots requires PME, that affects performance much.
+ Fast recovering, doesn't depend on amount operations to restore (WAL-free). - MVCC
- Ignite failed to support MVCC, very hard to implement.
From those options Consistent Cut is more preferable. It may require optimizations for recovery, but it looks like we can provide some. For example, some options:
...
public void cutId(@Nullable UUID id);
} |
Consistent Cut Classes
Code Block |
---|
language | java |
---|
title | ConsistentCutManager |
---|
|
// Class is responsible for managing all stuff related to Consistent Cut. It's an entrypoint for transaction threads to check running consistent cut.
class ConsistentCutManager extends GridCacheSharedManagerAdapter {
// Current Consistent Cut. All transactions threads wraps outgoing messages if this field is not null. */
volatile @Nullable ConsistentCut cut;
// Entrypoint for handling received new Consistent Cut ID.
void handleConsistentCutId(UUID id);
} |
Code Block |
---|
language | java |
---|
title | ConsistentCut |
---|
|
class ConsistentCut extends GridFutureAdapter<WALPointer> {
Set<GridCacheVersion> beforeCut;
Set<GridCacheVersion> afterCut;
Set<IgniteInternalFuture<IgniteInternalTx>> removedActive;
} |
Links
- ON DISTRIBUTED SNAPSHOTS, Ten H. LAI and Tao H. YANG, 29 May 1987