Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Choosing algorithm for transactional consistency

There are some possible solutions to guarantee transactional consistency:

  1. Consistent Cut
    + Simple algorithm - requires only some read-write locks to sync threads, and doesn't affect performance much.
    + Doesn't require to much additional space - it just writing few additional messages to WAL.
    - Requires time to recovery (applying every message from WAL to system) that depends on how many operations need to be restored.
    - Requires additional storage to persist WAL archives.
    - Doesn't restore ATOMIC caches.
  2. Incremental physical snapshots (collection of partition binary files changed since previous snapshot, can be implemented as delta or as full copy).
    - High disk IO usage for preparing snapshots.
    - Current implementation of snapshots requires PME, that affects performance much.
    + Fast recovering, doesn't depend on amount operations to restore (WAL-free).
  3. MVCC
    - Ignite failed to support MVCC, very hard to implement.

From those options Consistent Cut is more preferable. It may require optimizations for recovery, but it looks like we can provide some. For example, some options:

  1. Using WAL compaction for archived files, that excludes physical records from WAL files;
  2. Apply DataEntry from WAL in parallel by using striped executor (cache group id and partition id);
  3. Using index over WAL files for fast access to written Consistent Cuts.

Consistent Cut

Goal is to find Point on timeline on every node such that set of committed transactions before (after) this Point is the same on every node in a cluster.

Consider a simple model. On the pictures below there are two lines (P and Q). They represent two different Ignite nodes. Nodes exchange of transaction messages with each other. For simplicity let's consider there is the only state of tx (COMMITTED) and only single message related to one tx (one node notifies another about tx and latter immediately commits it after receiving the message).

Transaction can be described as ordered sequence of events. Correct order of events is known on every node. There are 4 ordered events to describe transaction in this simple model: 

  1. P.tx(COMMITTED): transaction committed on node P;
  2. P.snd(m): send message msg(tx5: P → Q);
  3. Q.rcv(m): receive message msg(tx5: P → Q);
  4. Q.tx(COMMITTED): transaction committed on node Q.

The cut line crosses both lines (P and Q) in the Points. Transactions (tx1, tx2, tx3, tx4) are on the left side of cut on both nodes. But tx5 is different because cut crosses also a message line. Then tx5 commits on different nodes in different states (before / after cut). To make a cut consistent the global order of transaction events must be kept - for any transaction if AFTER state contains some events from this order it means that BEFORE must contain all prior events.

Consider examples. First picture below describes case where tx5 state is:

...

Definitions

Paper [1] defines a distributed snapshot algorithm. It uses some definitions, let's describe them in terms of Ignite:

  1. LocalState - set of committed transactions prior to a moment.
    1. In Ignite LocalState is committed by writing ConsistentCutStartRecord into WAL. 
  2. Message - transaction message, it is sent after changing LocalState on sending node, and it changes LocalState on receiving node after delivering
    1. In Ignite such messages are FinishRequest for 2PC, PrepareResponse for 1PC.
  3. Channel - unidirectional connection between two Ignite nodes, where messages is sent (Communication SPI).
  4. ChannelState - for single channel it's a set of messages that was sent (changed LocalState of sending node), but not received yet (hasn't changed LocalState of receiving node).
    1. In Ignite we can think that incoming ChannelState is represented by active transactions in PREPARING+ state (the Message might be sent by other nodes, but isn't received yet).
  5. GlobalSnapshot - set consist of (LocalState and ChannelState for every incoming channel) for every node.
    1. In Ignite the snapshot is represented with 2 WAL records (ConsistentCutStartRecord commits the LocalState, ConsistentCutFinishRecord describes the ChannelState).
  6. Marker - mark that piggy backs on the Message, and notifies a node about running snapshot.
    1. In Ignite ConsistentCutMarkerMessage is used.

In terms of Ignite, there are additional definitions:

  1. Consistent Cut - Successful attempt of creating Global Snapshot.
  2. Inconsistent Cut - Failed attempt of creating a Global Snapshot, due to inability to correctly describe a ChannelState

Global state before cut contains events { Q.rcv(m), Q.tx(COMMITTED) } that aren't leading events in the known sequence - it misses first events { P.tx(COMMITTED), P.snd(m) }. But order of events must match order of consistent states (BEFORE, AFTER), then such a cut is inconsistent.  

Second picture below describes case where tx5 state is:

  • before cut: node P = { tx(COMMITTED), snd(m) }, node Q = { }.
  • after cut: node P = { }, node Q = { rcv(m), tx(COMMITTED) }.

Global state before cut contains events { P.tx(COMMITTED), P.snd(m) } that are leading events in the known sequence. Such a cut is consistent.

Image RemovedImage Removed

In case of consistent cut every node knows which side of cut a local transaction belongs to (BEFORE or AFTER). On the second picture, node Q after event rcv knows whether tx5 was included to cut on node P. Then if it was, node Q includes it, otherwise doesn't include. 

Map the algorithm to Ignite

Ignite components

On receiving a message with new CutVersion node sets it and commits LocalState and ChannelState - to identify wrong order of the events

  1. LocalState maps to local WAL (all of committed transactions are part of LocalState);
  2. Channel:
    1. We can piggy back on Ignite transaction protocol (Prepare, Finish) messages with CommunicationSpi.
    2. In case there is no transaction for a channel, we can rely on the DiscoverySpi to start local snapshot on non-participated nodes.
  3. ChannelState maps to `IgniteTxManager#activeTransactions`:
    1. sent collection match committed transactions for which local node is near - they send FinishMessages to other nodes.
    2. received collection match committed transactions for which local node isn't near - they receive FinishMessages from other nodes. 
  4. `IgniteTxManager#activeTransactions` doesn't track:
    committing transactions (COMMITTING+), they are removed from this collection before start committing them.track them additionally: add to a separate collection before it starts committing, and remove after it committed.

Algorithm

For Ignite implementation it's proposed to use only single node to coordinate algorithm. User starts a command for creating new ConsistentCut incremental snapshot on single node and this node becomes responsible for coordinating Consistent Cut iteration:

  1. Initial state:
    1. Ignite WAL are in consistent state relatively to previous full or incremental snapshot.
    2. Every Ignite node has local ConsistentCut future equals to null (node is WHITE).
    3. Empty collection committingTxs (Set<GridCacheVersion>) that goal is to track COMMITTING+ transactions, that aren't part of IgniteTxManager#activeTx.
  2. Ignite node inites a global snapshotGlobalSnapshot, by starting DistributedProcess (by discovery IO):
    1. creates a new ConsistentCutMarker.
    2. prepares a marker message that contains the marker and transmits this message to other nodes.
  3. Every nodes starts a local snapshot process after receiving the marker message (whether by discovery, or by communication with transaction message) 
    1. Atomically: creates new ConsistentCut future (node becomes RED), creates committingTxs, starts signing outgoing messages with the ConsistentCutMarker.
    2. Write a snapshot record to WAL with the received ConsistentCutMarker (commits LocalState).
    3. Collect of active transactions - concat of IgniteTxManager#activeTx and committingTxs 
    4. Prepares 2 empty collections - before[sent - received] andafter[received, but not sent] cut (describes ChannelState).
  4. While global Consistent Cut is running every node signs output transaction messages:
    1. Prepare messages signed with the ConsistentCutMarker (to trigger ConsistentCut on remote node, if not yet).
    2. Finish messages signed with the ConsistentCutMarker (to trigger...) and transaction ConsistentCutMarker (to notify nodes which side of cut this transaction belongs to).
    3. Finish messages is signed on node that commits first (near node for 2PC, backup or primary for 1PC).
  5. For every collected active transaction, node waits for Finish message, to extract the ConsistentCutMarker and fills before, after collections:
    1. if received marker is null or differs from local, then transaction on before side
    2. if received color equals to local, then transaction on after side
  6. After all transactions finished:
    1. Writes a finish WAL record with ChannelState (before, after). 
    2. Stops filling committingTxs.
    3. Completes ConsistentCut future, and notifies a node-initiator about finishing local procedure (with DistributedProcess protocol).
  7. After all nodes finished ConsistentCut, every node stops signing outgoing transaction messages - ConsistentCut future becomes null (node is WHITE again).

...

`id` is just a unique ConsistentCut ID (is assigned on the node initiator).

...


Signing messages

In the 2PC protocol in most cases (except some optimization and corner cases) sequence of events for transaction can be described as set of messages (PrepareRequest, PrepareResponse, FinishRequest, FinishResponse) and set of TransactionState. For case with 2 nodes (P - near, Q - primary) it looks like that:  

P.tx(PREPARING) → P.snd(GridNearPrepareRequest) → Q.rcv(GridNearPrepareRequest) → Q.tx(PREPARED) → Q.snd(GridNearPrepareResponse) → P.rcv(GridNearPrepareResponse) → P.tx(PREPARED) → P.tx(COMMITTED) → P.snd(GridNearFinishRequest) →  Q.rcv(GridNearFinishRequest) → Q.tx(COMMITTED)

Image Removed

Important steps in this sequence:

  • near node commits before primary node (primary node commits before backup node)
  • there is a FinishRequest between between actual commits.

Consistent Cut procedure can start in any moment and cross this sequence in any chain. In this moment every node should decide which side of ConsistentCut every transaction belongs to. Some important notes:

  1. There are some issues with ACTIVE transactions:
    1. They can be long
    2. They may become SUSPENDED and hang ConsistentCut after that (The only valid way to reach SUSPENDED state is ACTIVE → SUSPENDED).
  2. Then it's better to avoid checking such transactions, and algorithm helps it: for every transaction in ACTIVE state there are at least 2 messages to sync CutVersion between nodes (PREPARE response, FINISH request) - it's enough to provide a guarantee that every ACTIVE transaction will on the AFTER side.
  3. For other transactions (>= PREPARING) algorithm listens their finish futures. 

Consider some examples below (P - near, Q - primary):

...

  1. P: cut before rcv(GridNearPrepareResponse), then tx isn't PREPARED yet. Then P signs FinishRequest with new CutVersion.
  2. Q collects this tx in moment of cut (as locally it's PREPARED) and waiting for FinishRequest. On receiving it checks message: local CutVer == rcvd txCutVer, then belongs AFTER.

Ignite transaction protocol includes multiple messages. But only some of them affects meaningful (relating to the algorithm) that change state of transactions (PREPARED, COMMITTED):

  1. GridNearTxPrepareRequest / GridDhtTxPrepareRequest
  2. GridNearTxPrepareResponse / GridDhtTxPrepareResponse
  3. GridNearTxFinishRequest / GridDhtTxFinishRequest

Also some messages require to be signed with tx color to check it them on primary/backup node:

  1. GridNearTxFinishRequest / GridDhtTxFinishRequest
  2. GridNearTxPrepareResponse / GridDhtTxPrepareResponse (for 1PC algorithm).

WAL records

There are 2 records: `ConsistentCutStartRecord` for Start event and `ConsistentCutFinishRecord` for Finish event. 

  • ConsistentCutStartRecord: record is written to WAL in moment when CC starts on a local node. It helps to limit amout of active transactions to check. But there is no strict guarantee for all transactions belonged to the BEFORE side to be physically committed before ConsistentCutStartRecord, and vice versa. This is the reason for having ConsistentCutFinishRecord.
  • ConsistentCutFinishRecord: This record is written to WAL after Consistent Cut stopped analyzing transactions and storing them in a particular bucket (BEFORE or AFTER).

It guarantees that the BEFORE side consist of:
1. transactions physically committed before ConsistentCutStartRecord and weren't included into ConsistentCutFinishRecord#after();
2. transactions physically committed between ConsistentCutStartRecord and ConsistentCutFinishRecord and were included into ConsistentCutFinishRecord#before().

It guarantees that the AFTER side consist of:
1. transactions physically committed before ConsistentCutStartRecord and were included into ConsistentCutFinishRecord#after();
2. transactions physically committed after ConsistentCutStartRecord and weren't included into ConsistentCutFinishRecord#before().


Code Block
languagejava
titleConsistentCutRecord
/** */
public class ConsistentCutStartRecord extends WALRecord {
	/** Marker that inits Consistent Cut. */
	private final ConsistentCutMarker marker;
}


/** */
public class ConsistentCutFinishRecord extends WALRecord {
    /**
     * Collections of TXs committed BEFORE the ConsistentCut (sent - received).
     */
    private final Set<GridCacheVersion> before;

     /**
     * Collections of TXs committed AFTER the ConsistentCut (exclude).
     */
    private final Set<GridCacheVersion> after;
 }

Image RemovedImage Removed

Pictures below describes a little bit complicate example: P commits tx AFTER, Q commits tx BEFORE. But actually Consistent Cut algorithm doesn't allow such case, because this cut is inconsistent. Global state of tx includes events:  { Q.snd(GridNearPrepareResponse), Q.rcv(GridNearFinishRequest), Q.tx(COMMITTED) }. But this state misses some middle events { P.rcv(GridNearPrepareResponse), P.tx(COMMITTED), P.snd(GridNearFinishRequest) }.

Then this cut is inconsistent. To avoid such cases, FinishRequest is signed with latestCutVer, and node Q must to validate it and trigger Consistent Cut before applying the message. And after that this case equals to case from previous pictures.

  1. P: cut before P.rcv(GridNearPrepareResponse), then tx isn't PREPARED yet. Then P signs FinishRequest with new CutVersion.
  2. Q: by receiving FinishRequest it triggers Consistent Cut, collect PREPARED tx in this moment, and after that check version from FinishRequest.
  3. Case equals to that on the second picture above. Globally tx AFTER cut.

Image RemovedImage Removed

One-Phase commit handling

Sequence of events for 1PC differs:

P.tx(PREPARING) → P.snd(GridNearPrepareRequest) → P.rcv(GridNearPrepareRequest) → Q.tx(PREPARED) → Q.tx(COMMITTED) → Q.snd(GridNearPrepareResponse) → P.rcv(GridNearPrepareResponse) → P.tx(PREPARED) → P.tx(COMMITTED)

Important points in this sequence

  • backup node commits before primary node
  • there is a GridNearPrepareResponse between between actual commits 

Then for 1PC the backup (or primary, if backups=0) is responsible for signing tx with CutVersion, not near node. CutVersion propogates between nodes in reverse order: near ← primary ← backup.

Signing messages

Ignite transaction protocol includes multiple messages. But only some of them affects meaningful (relating to the algorithm) that change state of transactions (PREPARED, COMMITTED):

  1. GridNearTxPrepareRequest / GridDhtTxPrepareRequest
  2. GridNearTxPrepareResponse / GridDhtTxPrepareResponse
  3. GridNearTxFinishRequest / GridDhtTxFinishRequest

Also some messages require to be signed with tx color to check it them on primary/backup node:

...

Unstable topology

There are some cases to handle for unstable topology:

...

  1. First solution: set flag ConsistentCutManager#inconsistent  to true , and persist this flag within local MetaStorage.
    1. On receiving new ConsistentCutVersion check the flag and raise an exception.
    2. Clean flag on recovering from ClusterSnapshot, or after creating a ClusterSnapshot.
    3. + Simple handling in runtime.
    4. - Need to rebalance a single node after PITR with file-based (or other?) rebalance: more time for recovery, file-based rebalance has not-resolved issues yet(?). 

  2. Do not disable WAL during rebalance:
    + WAL now is consistent with data, can use it for PITR after rebalancing
    - Too many inserts during rebalance may affect rebalance speed, IO utilization

  3. Automatically create snapshots for rebalancing cache groups:
    + Guarantee of consistency (snapshot is created on PME, after rebalance).
    + Faster recovery.
    - Complexity of handling - Ignite should provide additional tool for merging WAL and multiple snapshots created in different time
    ? is it possible to create snapshot only on single rebalanced node?
    ? Is it possible to sync current PME (on node join) with starting snapshot?

  4. Write a special Rebalance record to WAL with description of demanded partition:
    1. During restore read this record and repeat the historical rebalance at this point, after rebalance resume recovery with existing WALs.
    2. In case Record contains full rebalance - stops recovering with WAL and fallback to full rebalance.
      ? Is it possible to rebalance only specific cache groups, and continue to WAL recovery for others.
      - For historical rebalance during recovery need separate logic for extracting records from WAL archives from other nodes. 

...

Choosing algorithm for transactional consistency

There are 2 records: `ConsistentCutStartRecord` for Start event and `ConsistentCutFinishRecord` for Finish event. 

  • ConsistentCutStartRecord: record is written to WAL in moment when CC starts on a local node. It helps to limit amout of active transactions to check. But there is no strict guarantee for all transactions belonged to the BEFORE side to be physically committed before ConsistentCutStartRecord, and vice versa. This is the reason for having ConsistentCutFinishRecord.
  • ConsistentCutFinishRecord: This record is written to WAL after Consistent Cut stopped analyzing transactions and storing them in a particular bucket (BEFORE or AFTER).

...

languagejava
titleConsistentCutRecord

...

some possible solutions to guarantee transactional consistency:

  1. Consistent Cut
    + Simple algorithm - requires only some read-write locks to sync threads, and doesn't affect performance much.
    + Doesn't require to much additional space - it just writing few additional messages to WAL.
    - Requires time to recovery (applying every message from WAL to system) that depends on how many operations need to be restored.
    - Requires additional storage to persist WAL archives.
    - Doesn't restore ATOMIC caches.
  2. Incremental physical snapshots (collection of partition binary files changed since previous snapshot, can be implemented as delta or as full copy).
    - High disk IO usage for preparing snapshots.
    - Current implementation of snapshots requires PME, that affects performance much.
    + Fast recovering, doesn't depend on amount operations to restore (WAL-free).
  3. MVCC
    - Ignite failed to support MVCC, very hard to implement.

From those options Consistent Cut is more preferable. It may require optimizations for recovery, but it looks like we can provide some. For example, some options:

  1. Using WAL compaction for archived files, that excludes physical records from WAL files;
  2. Apply DataEntry from WAL in parallel by using striped executor (cache group id and partition id);
  3. Using index over WAL files for fast access to written Consistent Cuts.
  1. ON DISTRIBUTED SNAPSHOTS, Ten H. LAI and Tao H. YANG, 29 May 1987