Each transaction is assigned a deadline (by setting an explicit or default timeout), after which a commit is not possible. A txn coordinator watches for deadlines and preliminary aborts convicts.

Components hierarchy for CC

Below is the list of components that are required for CC execution. Each component contains a minimal set of required methods.


Implements methods for accessing multi-version data under txn context. One per data table partition. Contains all primary and secondary indexes.

Code Block
// Puts a row. This call updates primary and all secondary indexes after acquiring necessary locks.
CompletableFuture<Void> put(Tuple row, UUID txId); 

// Removes a row. This call updates primary and all secondary indexes after acquiring necessary locks.
CompletableFuture<Tuple> remove(Tuple keyTuple, UUID txId);

// Executes the query in RW mode.
AsyncCursor<Tuple> query(@Nullable Query query, UUID txId);

// Executes the query in RO mode.
Cursor<Tuple> query(@Nullable Query query, Timestamp readTs);

// Invokes a closure over the set of records matching the filter. Returns a number of modified rows.
CompletableFuture<Long> invoke(InvokeClosure closure, @Nullable Filter filter, UUID txId);

// Commits a transaction with a timestamp.
CompletableFuture<Boolean> commit(UUID txId, Timestamp commitTs);

// Aborts a transaction
CompletableFuture<Boolean> abort(UUID txId);

Query implements the data retrieval logic and can use indexes. Possible types are:

  • ScanQuery
  • RangeQuery
    @Nullable Tuple keyLower();
    boolean lowerInclusive();
    @Nullable Tuple keyUpper();
    boolean upperInclusive();
  • EqQuery
    Tuple key();
  • InQuery
    List<Tuple> keys();
  • etc ...

The returned cursor must support async execution, because scans can acquire locks in RW mode.

Not all indexes can support all filter types. TBD: define new index types, like BloomFilter, etc.

The proposed operations should support batches.


Used to manage locks (easily guessed). It has a volatile state, which is lost on restart. One per table. Used by MVStore to acquire/release locks as required by CC protocol.

LockManager provides the following operations:

  • CompletableFutute<LockDesc> acquire(UUID txId, byte[] lockName, LockMode mode); // Acquires (or upgrades) a lock and returns a future with a lock description
  • void release(Lock lock); // Release previously acquired lock
  • void downgrade(Lock lock, LockMode mode); // Downgrades a lock
  • Iterator<Lock> locks(UUID txId); // Return all locks acquired by a transaction.


Holds a transaction state. Currently two states are supported: COMMITTED and ABORTED. One per data node. Used by MVStore to resolve write intents and for txn recovery purposes.

TxnStateManager provides the following operations:

  • CompletableFuture<TxState> state(UUID txId);
  • CompletableFuture<Boolean> changeState(UUID txId, TxState beforeState, TxState afterState); // Atomically change txn state

This state is replicated using the ReplicationManager for a partition, calculated by a transaction id. Uses a separate TxnStateStore for holding txn state.


Coordinates a transaction execution. Can be deployed on any cluster node, not necessary data node. One per node. Holds a map of active transactions UUID→InternalTransaction, assuming a txId is implemented using UUID class.

Code Block
// Assign RW transaction id.
CompletableFuture<UUID> beginAsync();

// Commit the transaction
CompletableFuture<Boolean> commitAsync(UUID txId);

// Rollback the transaction
CompletableFuture<Boolean> rollbackAsync(UUID txId);

There is no need to explicitly start the RO transaction. A user must pass a timestamp instead of txId to execute a RO transaction.


Internal facade for transactional record storage. Can be deployed on any cluster node, not necessary data node. Caches the information about table partitions topology and last known table schema. One per node. Executes transactional ops by sending the requests to enlisted partition data nodes, defined by the calculated data affinity.

The set of methods is similar to MVStore. Additionally, snapshot scans are allowed to be executed on an exact node by passing a custom partition map:

Code Block
Cursor<Tuple> scan(Tuple keyLower, boolean lowerInclusive, Tuple keyUpper, boolean upperInclusive, 
@Nullable Timestamp ts, @Nullable Map<PartitionId, ClusterNode> partMap);

// Other methods

Snapshot scan is executed on backups out of the box and, if the parMap is not null, uses this map to define the exact partition to node mapping.


Provides replication capabilities to MVStore. The actual implementation can use any replication protocol, for example, RAFT. One per replicated partition. Delegates to MVStore to execute “the applyphase of a replication.

Code Block
CompletableFuture<Peer> getOrWaitLeader();

@Nullable Peer leader();

isLocal(Peer peer);

List<Peer> peers();

List<Peer> learners();

<T> CompletableFuture<T> apply(Command command);

boolean isLeaderLeaseValid(HLC ts);


Extends the MVStore functionality with replication support and leases validity for reads. Uses ReplicationManager for execution of replication related logic.


Backup replica reads

RW transactions can read from backup replicas, assuming a proper read lock is acquired on a primary replica and an entry is replicated. A lock response will install locally the proper HLC for waiting. See replica reads section on how to ensure that. Beware: this might incur blocking if a local replica is lagging.

When to use RW transactions

Here are scenarios in which you should use a locking read-write transaction:

  • If you do a write that depends on the result of one or more reads, you should do that write and the read(s) in the same read-write transaction.
    • Example: double the balance of bank account A. The read of A's balance should be in the same transaction as the write to replace the balance with the doubled value.
  • If you do one or more writes that need to be committed atomically, you should do those writes in the same read-write transaction.
    • Example: transfer $200 from account A to account B. Both of the writes (one to decrease A by $200 and one to increase B by $200) and the reads of initial account balances should be in the same transaction.
  • If you might do one or more writes, depending on the results of one or more reads, you should do those writes and reads in the same read-write transaction, even if the write(s) don't end up executing.
    • Example: transfer $200 from bank account A to bank account B if A's current balance is greater than $500. Your transaction should contain a read of A's balance and a conditional statement that contains the writes.

RO transactions

The algorithm

An RO transaction is associated with a read timestamp at the beginning. It scans indexes to get access to table data via indexed rowIds. Because it passes normal locking, rowIds must be somehow filtered according to their visibility. This filtering process is known as a write intent resolution and is crucial for RO transactions processing. Write intent resolution is required then a version chain corresponding to rowId has an uncommitted version.

We expect RO txn serializability. The reading txn can “see” the write intent only if it's committed and the commit timestamp <= read timestamp. A repeating RO txn at the timestamp T must always return the same result set.

In a nutshell, the write intent resolution process ensures the aforementioned invariant. The read-only transaction performs a resolve(RowId rowId, Timestamp readTs) operation for each accessed rowId, which includes the following steps:

  • If there is no unresolved write intent, the reading txn must find the corresponding committed version in the version chain by passing the read timestamp.
  • If there is unresolved write intent (uncommitted version in the chain head), it initiates the intent resolution 
    • Checks a local txn state map for committed or aborted state - allow read if the state is committed and commitTs <= readTs.
    • If not possible, send a TxStateReq  to the txn coordinator - this initiates the coordinator path. Coordinator address is fetched from the txn state map.
    • If a coordinator path was not able to resolve the intent, one of the following has happened - the coordinator is dead or txn state is not available in the cache. Calculate a commit partition and send the TxStateReq to its primary replica - this initiates the commit partition path.
    • Retry commit partition path until a success or timeout.

On receiving TxStateReq on the coordinator:



RO transactions

The algorithm

An RO transaction is associated with a read timestamp at the beginning. It scans indexes to get access to table data via indexed rowIds. Because it passes normal locking, rowIds must be somehow filtered according to their visibility. This filtering process is known as a write intent resolution and is crucial for RO transactions processing. Write intent resolution is required then a version chain corresponding to rowId has an uncommitted version.

We expect RO txn serializability. The reading txn can “see” the write intent only if it's committed and the commit timestamp <= read timestamp. A repeating RO txn at the timestamp T must always return the same result set.

In a nutshell, the write intent resolution process ensures the aforementioned invariant. The read-only transaction performs a resolve(RowId rowId, Timestamp readTs) operation for each accessed rowId, which includes the following steps:

  • If there is no unresolved write intent, the reading txn must find the corresponding committed version in the version chain by passing the read timestamp.
  • If there is unresolved write intent (uncommitted version in the chain head), it initiates the intent resolution 
    • Checks a local txn state map for committed or aborted state - allow read if the state is committed and commitTs <= readTs.
    • If not possible, send a TxStateReq  to the txn coordinator - this initiates the coordinator path. Coordinator address is fetched from the txn state map.
    • If a coordinator path was not able to resolve the intent, one of the following has happened - the coordinator is dead or txn state is not available in the cache. Calculate a commit partition and send the TxStateReq to its primary replica - this initiates the commit partition path.
    • Retry commit partition path until a success or timeout.

On receiving TxStateReq on the coordinator:

H2 = r1(x

Write intent resolution implies the important invariant: if a RO transaction has done a read at a timestamp T over partition P, no other active RW txn, enlisted to this partition, can commit later at a timestamp T0 <= T.

The example of a write intent resolution:

Image Removed

There is another subtle issue. Assume client 1 reads from a partition P with readTs=100 and sees an empty partition. After the read a client 2 commits a value to a partition P. Normally TX2 will use commitTs >= readTs, because the first read operation will install its HLC = readTs, so the repeated read with the same timestamp will not see the changes from client 2. But, there is a caveat. A primary replica for a partition can die before replicating the updated HLC (applied by RO read). In this case txn will remap to a new primary replica and can commit with a lower HLC if no precautions are taken. A new leaseholder must be elected before successful remap. The leaseholder disjoint invariant guarantees that any commit timestamp assigned on a new leaseholder is greater than the readTs, which is within the old lease interval.

Read timestamp cache

Each unresolved version chain can be associated with a volatile readTs, which is initially set to creation time, to avoid unnecessary resolution by the transactions in the past. This timestamp is adjusted by RO transactions. If readTs(T) < readTs(V), transaction T can skip the resolution step.

It is also possible to eagerly resolve write intents for long running RW transactions in batches, keeping the timestamp cache fresh.

Replica reads

This section will cover up recovery for transactions. It describes how the txn protocol recovers from various kinds of failures: a txn coordinator failure, enlisted data node failure, majority loss for a partition, which is a particular case of data node loss.

Data node failure

First consider the simplest case: a data node failure. In this case a txn coordinator is alive, has full txn topology, and can commit or abort the transaction directly, so the transaction is never abandoned and makes progress. A single data node failure doesn’t prevent a transaction’s progress until a majority is preserved, except in the case of read lease expiration. Even if a whole partition was lost be we know that all writes are replicated, the transaction can be committed.

RW conflict avoidance

There is a special case for RW conflict. We assume read locks are neither persistent nor replicated, so its state is lost on primary replica failure. This makes it impossible to detect RW conflicts, so we need to avoid it somehow.

Consider a scenario when a read was made from a primary replica and it goes offline before a transaction commits. Additional measures are required to preserve read safety.

Because read locks are not replicated, it's possible that another transaction is mapped to a new primary replica and invalidates a read for the current not yet committed transaction.

Assume a transactions

T1: a=r1(x), w1(y), c1

T2: w2(x), c2

Due to PR1 failure and a lost read lock, a following history is possible.

H = r1(x) w2(x) c2 w1(y) c1

Assume the RO transaction T3 and the history

H2 = r1(x) w2(x) c2 r3(x) r3(y) w1(y) c1

The RO transaction will return the inconsistent result to a user, not corresponding to any serial execution of T1, T2, T3

This is illustrated by the diagram:

RO transactions are handled differently. They rely on a write intent resolution to make progress. This process is described in detail in Reading the data. But it’s possible that both paths are impossible due to failed or unresponsive nodes. If by some reason a write intent resolution is not possible (or timed out), the RO txn must be failed with a corresponding exception to avoid the risk of reading inconsistent data.

Handling node restarts

On a restart a volatile node state is lost. No special handling is required for unresolved write intents on node restarts. If they are accessed in this state, the transaction will try to resolve its state via the commit partition (because local txn map doesn't contain the coordinator), as described in the RW algorithm section. If a commit partition doesn’t contain the txn state, this means a transaction is abandoned and must be aborted. A transaction is added to abandoned list and is processed as described in the Coordinator Failure section. As soon as aborted state is written, a current txn can proceed.

The edge case for this scenario is a full cluster restart under the load.

Commit partition group primary replica failure

We have the cleanup phase as a part of the commit in the current protocol. For this purpose a list of enlisted partitions is passed together with a finish request from a txn coordinator to a commit partition group. A commit includes the part then a cleanup request is sent to all enlisted partitions to unlock locks and commit uncommitted versions.

But it’s possible that a commit partition group primary replica fails in the middle of a process.

To avoid the situation, we must ensure that until a majority is preserved, all enlisted partitions eventually will receive a message.

We can use a simple protocol backed by consensus to achieve this, previously called a durable cleanup. We use two raft commands for this purpose:

FinishCommand (TxState txtate, Map<PartId,Boolean> finishState);

where finishState holds the true if a corresponding partition group acked the unlocking, false - otherwise (default value)

As soon as FinishCommand is committed to a group, a background process periodically sends CleanupRequest to enlisted partitions. Each cleanup request is processed by a partition group and acks the corresponding map entry by sending CleanupAckRequest. If a primary replica fails - this is not a problem, because CleanupRequests periodically are being resent. As soon as all partitions are acked, the background process stops to send unlock requests. Unlock requests are idempotent, so sending many requests shouldn’t be a problem.

Commit partition group majority loss

This is a reason for unavailability. Some transactions can’t be finished until the majority is restored.

Clock requirements

The requirements for correct snapshot is:
