Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Each transaction is assigned a deadline (by setting an explicit or default timeout), after which a commit is not possible. A txn coordinator watches for deadlines and preliminary aborts convicts.

Components hierarchy for CC

Below is the list of components that are required for CC execution. Each component contains a minimal set of required methods.

MVStore

Implements methods for accessing multi-version data under txn context. One per data table partition. Contains all primary and secondary indexes.

Code Block
languagejava
// Puts a row. This call updates primary and all secondary indexes after acquiring necessary locks.
CompletableFuture<Void> put(Tuple row, UUID txId); 

// Removes a row. This call updates primary and all secondary indexes after acquiring necessary locks.
CompletableFuture<Tuple> remove(Tuple keyTuple, UUID txId);

// Executes the query in RW mode.
AsyncCursor<Tuple> query(@Nullable Query query, UUID txId);

// Executes the query in RO mode.
Cursor<Tuple> query(@Nullable Query query, Timestamp readTs);

// Invokes a closure over the set of records matching the filter. Returns a number of modified rows.
CompletableFuture<Long> invoke(InvokeClosure closure, @Nullable Filter filter, UUID txId);

// Commits a transaction with a timestamp.
CompletableFuture<Boolean> commit(UUID txId, Timestamp commitTs);

// Aborts a transaction
CompletableFuture<Boolean> abort(UUID txId);

Query implements the data retrieval logic and can use indexes. Possible types are:

  • ScanQuery
  • RangeQuery
    @Nullable Tuple keyLower();
    boolean lowerInclusive();
    @Nullable Tuple keyUpper();
    boolean upperInclusive();
  • EqQuery
    Tuple key();
  • InQuery
    List<Tuple> keys();
  • etc ...

The returned cursor must support async execution, because scans can acquire locks in RW mode.

Not all indexes can support all filter types. TBD: define new index types, like BloomFilter, etc.

The proposed operations should support batches.

LockManager 

Used to manage locks (easily guessed). It has a volatile state, which is lost on restart. One per table. Used by MVStore to acquire/release locks as required by CC protocol.

LockManager provides the following operations:

  • CompletableFutute<LockDesc> acquire(UUID txId, byte[] lockName, LockMode mode); // Acquires (or upgrades) a lock and returns a future with a lock description
  • void release(Lock lock); // Release previously acquired lock
  • void downgrade(Lock lock, LockMode mode); // Downgrades a lock
  • Iterator<Lock> locks(UUID txId); // Return all locks acquired by a transaction.

TxnStateManager

Holds a transaction state. Currently two states are supported: COMMITTED and ABORTED. One per data node. Used by MVStore to resolve write intents and for txn recovery purposes.

TxnStateManager provides the following operations:

  • CompletableFuture<TxState> state(UUID txId);
  • CompletableFuture<Boolean> changeState(UUID txId, TxState beforeState, TxState afterState); // Atomically change txn state

This state is replicated using the ReplicationManager for a partition, calculated by a transaction id. Uses a separate TxnStateStore for holding txn state.

TxnCoordinator 

Coordinates a transaction execution. Can be deployed on any cluster node, not necessary data node. One per node. Holds a map of active transactions UUID→InternalTransaction, assuming a txId is implemented using UUID class.

Code Block
languagejava
// Assign RW transaction id.
CompletableFuture<UUID> beginAsync();

// Commit the transaction
CompletableFuture<Boolean> commitAsync(UUID txId);

// Rollback the transaction
CompletableFuture<Boolean> rollbackAsync(UUID txId);

There is no need to explicitly start the RO transaction. A user must pass a timestamp instead of txId to execute a RO transaction.

InternalTable 

Internal facade for transactional record storage. Can be deployed on any cluster node, not necessary data node. Caches the information about table partitions topology and last known table schema. One per node. Executes transactional ops by sending the requests to enlisted partition data nodes, defined by the calculated data affinity.

The set of methods is similar to MVStore. Additionally, snapshot scans are allowed to be executed on an exact node by passing a custom partition map:

Code Block
languagejava
Cursor<Tuple> scan(Tuple keyLower, boolean lowerInclusive, Tuple keyUpper, boolean upperInclusive, 
@Nullable Timestamp ts, @Nullable Map<PartitionId, ClusterNode> partMap);

// Other methods

Snapshot scan is executed on backups out of the box and, if the parMap is not null, uses this map to define the exact partition to node mapping.

ReplicationManager

Provides replication capabilities to MVStore. The actual implementation can use any replication protocol, for example, RAFT. One per replicated partition. Delegates to MVStore to execute “the applyphase of a replication.

Code Block
languagejava
CompletableFuture<Peer> getOrWaitLeader();

@Nullable Peer leader();

isLocal(Peer peer);

List<Peer> peers();

List<Peer> learners();

<T> CompletableFuture<T> apply(Command command);

boolean isLeaderLeaseValid(HLC ts);

ReplicatedMVStore

Extends the MVStore functionality with replication support and leases validity for reads. Uses ReplicationManager for execution of replication related logic.

Optimizations

  1. Protocol messages to the same node should be coalesced for better throughput.
  2. If the write-only transaction writeset is known in advance and all keys are mapped to a single partition, such a transaction can be committed in 1CT. This is known as one phase commit optimization.
  3. A local cache can be used to hold values read by a transaction, to avoid RTT on reading the same key.
  4. Txn latency can be reduced by performing write operations at the end of a transaction, preferably in batches.

Backup replica reads

RW transactions can read from backup replicas, assuming a proper read lock is acquired on a primary replica and an entry is replicated. A lock response will install locally the proper HLC for waiting. See replica reads section on how to ensure that. Beware: this might incur blocking if a local replica is lagging.

When to use RW transactions

Here are scenarios in which you should use a locking read-write transaction:

  • If you do a write that depends on the result of one or more reads, you should do that write and the read(s) in the same read-write transaction.
    • Example: double the balance of bank account A. The read of A's balance should be in the same transaction as the write to replace the balance with the doubled value.
  • If you do one or more writes that need to be committed atomically, you should do those writes in the same read-write transaction.
    • Example: transfer $200 from account A to account B. Both of the writes (one to decrease A by $200 and one to increase B by $200) and the reads of initial account balances should be in the same transaction.
  • If you might do one or more writes, depending on the results of one or more reads, you should do those writes and reads in the same read-write transaction, even if the write(s) don't end up executing.
    • Example: transfer $200 from bank account A to bank account B if A's current balance is greater than $500. Your transaction should contain a read of A's balance and a conditional statement that contains the writes.

RO transactions

The algorithm

An RO transaction is associated with a read timestamp at the beginning. It scans indexes to get access to table data via indexed rowIds. Because it passes normal locking, rowIds must be somehow filtered according to their visibility. This filtering process is known as a write intent resolution and is crucial for RO transactions processing. Write intent resolution is required then a version chain corresponding to rowId has an uncommitted version.

We expect RO txn serializability. The reading txn can “see” the write intent only if it's committed and the commit timestamp <= read timestamp. A repeating RO txn at the timestamp T must always return the same result set.

In a nutshell, the write intent resolution process ensures the aforementioned invariant. The read-only transaction performs a resolve(RowId rowId, Timestamp readTs) operation for each accessed rowId, which includes the following steps:

  • If there is no unresolved write intent, the reading txn must find the corresponding committed version in the version chain by passing the read timestamp.
  • If there is unresolved write intent (uncommitted version in the chain head), it initiates the intent resolution 
    • Checks a local txn state map for committed or aborted state - allow read if the state is committed and commitTs <= readTs.
    • If not possible, send a TxStateReq  to the txn coordinator - this initiates the coordinator path. Coordinator address is fetched from the txn state map.
    • If a coordinator path was not able to resolve the intent, one of the following has happened - the coordinator is dead or txn state is not available in the cache. Calculate a commit partition and send the TxStateReq to its primary replica - this initiates the commit partition path.
    • Retry commit partition path until a success or timeout.

On receiving TxStateReq on the coordinator:

...

Optimizations

  1. Protocol messages to the same node should be coalesced for better throughput.
  2. If the write-only transaction writeset is known in advance and all keys are mapped to a single partition, such a transaction can be committed in 1CT. This is known as one phase commit optimization.
  3. A local cache can be used to hold values read by a transaction, to avoid RTT on reading the same key.
  4. Txn latency can be reduced by performing write operations at the end of a transaction, preferably in batches.

Backup replica reads

RW transactions can read from backup replicas, assuming a proper read lock is acquired on a primary replica and an entry is replicated. A lock response will install locally the proper HLC for waiting. See replica reads section on how to ensure that. Beware: this might incur blocking if a local replica is lagging.

When to use RW transactions

Here are scenarios in which you should use a locking read-write transaction:

  • If you do a write that depends on the result of one or more reads, you should do that write and the read(s) in the same read-write transaction.
    • Example: double the balance of bank account A. The read of A's balance should be in the same transaction as the write to replace the balance with the doubled value.
  • If you do one or more writes that need to be committed atomically, you should do those writes in the same read-write transaction.
    • Example: transfer $200 from account A to account B. Both of the writes (one to decrease A by $200 and one to increase B by $200) and the reads of initial account balances should be in the same transaction.
  • If you might do one or more writes, depending on the results of one or more reads, you should do those writes and reads in the same read-write transaction, even if the write(s) don't end up executing.
    • Example: transfer $200 from bank account A to bank account B if A's current balance is greater than $500. Your transaction should contain a read of A's balance and a conditional statement that contains the writes.

RO transactions

The algorithm

An RO transaction is associated with a read timestamp at the beginning. It scans indexes to get access to table data via indexed rowIds. Because it passes normal locking, rowIds must be somehow filtered according to their visibility. This filtering process is known as a write intent resolution and is crucial for RO transactions processing. Write intent resolution is required then a version chain corresponding to rowId has an uncommitted version.

We expect RO txn serializability. The reading txn can “see” the write intent only if it's committed and the commit timestamp <= read timestamp. A repeating RO txn at the timestamp T must always return the same result set.

In a nutshell, the write intent resolution process ensures the aforementioned invariant. The read-only transaction performs a resolve(RowId rowId, Timestamp readTs) operation for each accessed rowId, which includes the following steps:

  • If there is no unresolved write intent, the reading txn must find the corresponding committed version in the version chain by passing the read timestamp.
  • If there is unresolved write intent (uncommitted version in the chain head), it initiates the intent resolution 
    • Checks a local txn state map for committed or aborted state - allow read if the state is committed and commitTs <= readTs.
    • If not possible, send a TxStateReq  to the txn coordinator - this initiates the coordinator path. Coordinator address is fetched from the txn state map.
    • If a coordinator path was not able to resolve the intent, one of the following has happened - the coordinator is dead or txn state is not available in the cache. Calculate a commit partition and send the TxStateReq to its primary replica - this initiates the commit partition path.
    • Retry commit partition path until a success or timeout.

On receiving TxStateReq on the coordinator:

  • If the local txn is finished, return the response with the outcome: commit or abort. The txn state is stored in a local cache, mentioned in the RW section.

    If the local txn is finishing (waiting for finish state replication), wait for outcome and return response with the outcome: commit or abort

    If the outcome is to commit, additional timestamp check is required: a commit timestamp must be <= readTs. If the condition is not held, the outcome is changed to commit .

    If local txn is active, adjust the txn coordinator node HLC according to readTs to make sure the txn commit timestamp is above the read timestamp. The read timestamp must be installed before txn is started to commit, so commit timestamp is assigned after the read timestamp. This must be achieved by some sort of concurrency control, preferably non-blocking. In this case we must ignore the write intent, so the outcome is to abort.

    If txn state is not found in a local cache and txn is not active, return NULL.

On receiving TxStateReq on commit partition:

  • Read the txn state from the persistent storage. If found, validate a commit timestamp (only for committed state) and return the outcome. If not found, return NULL outcome in a TxStateResp.

Write intent resolution implies the important invariant: if a RO transaction has done a read at a timestamp T over partition P, no other active RW txn, enlisted to this partition, can commit later at a timestamp T0 <= T.

The example of a write intent resolution:

Image Added

There is another subtle issue. Assume client 1 reads from a partition P with readTs=100 and sees an empty partition. After the read a client 2 commits a value to a partition P. Normally TX2 will use commitTs >= readTs, because the first read operation will install its HLC = readTs, so the repeated read with the same timestamp will not see the changes from client 2. But, there is a caveat. A primary replica for a partition can die before replicating the updated HLC (applied by RO read). In this case txn will remap to a new primary replica and can commit with a lower HLC if no precautions are taken. A new leaseholder must be elected before successful remap. The leaseholder disjoint invariant guarantees that any commit timestamp assigned on a new leaseholder is greater than the readTs, which is within the old lease interval.

Read timestamp cache

Each unresolved version chain can be associated with a volatile readTs, which is initially set to creation time, to avoid unnecessary resolution by the transactions in the past. This timestamp is adjusted by RO transactions. If readTs(T) < readTs(V), transaction T can skip the resolution step.

It is also possible to eagerly resolve write intents for long running RW transactions in batches, keeping the timestamp cache fresh.

Replica reads

RO transactions can be executed on non-primary replicas. write intent resolution doesn’t help because a write intent for a committed transaction may not be yet replicated to the replica. To mitigate this issue, it’s enough to run readIndex on each mapped partition leader, fetch the commit index and wait on a replica until it’s applied. This will guarantee that all required write intents are replicated and present locally. After that the normal write intern resolution should do the job.

There is a second option, which doesn’t require the network RTT. We can use a special low watermark timestamp (safeTs) per replication group, which corresponds to the apply index of a replicated entry, so then an apply index is advanced during the replication, then the safeTs is monotonically incremented too. The HLC used for safeTs advancing is assigned to a replicated entry in an ordered way.

Special measures are needed to periodically advance the safeTs if no updates are happening. It’s enough to use a special replication command for this purpose.

All we need during RO txn is to wait until a safeTs advances past the RO txn readTs. This is illustrated in the diagram:

Image Added

Here transactions T1 and T2 updates the different key belonging to the same partition.

OpReq(w1(x)) and OpReq(w2(x)) are received concurrently. Each write intent is assigned a timestamp in a monotonic order consistent with the replication order. This can be for example done when replication entries are dequeued for processing by replication protocol (we assume entries are replicated successively.

It’s not enough only to wait for safeTs - it may never happen due to absence of activity in the partition. Consider the next diagram:

Image Added

We need an additional safeTsSync command to propagate a safeTs event in case there are no updates in the partition.

However, the first approach seems easier to implement ss is illustrated by the diagrao we can stick with it from the start.

The minor issue is that the safeTs advancement consumes cluster resources even in the absence of load.

Note that we can avoid safeTs completely if a txn is considered committed then write intents replicated to all replicas. 

A version lifetime

We can define a row version lifetime in the range [now - TTL, now], where TTL is a preconfigured constant, for example 10 mins. All data beyond the low watermark are considered garbage and a subject for a garbage collection.

Active RO transactions prevent a version chain from expiration until they are finished (they “pin” it). This can be implemented using a reader ref counter.

If the RO transaction is attempted to map beyond the liveness range, it is immediately aborted. A latest committed version is never removed, even if its timestamp is beyond liveness range.

It is allowed to remove intermediate row versions from a chain, if they are not in use. For example, if we have a version chain like:
ts=40 val=4

ts=30 val=3

ts=20 val=2

ts=10 val=1

and a low watermark is reached 30, we can remove versions with ts=30 and ts=20 even if a RO tx in progress at readTs=10

TBD epochs for GC

Remapping scans on unstable topology

Scans performed under RO transactions can be remapped if a mapped node has failed. For this to work, a table must track the latest scanned value for each mapped node. In case of failure, the local node iterator can be reinstated from this point on another live replica. This can help to minimize RO transactions restarts on unstable topology.

Single key txn

Such a transaction always returns the latest committed version. No locks are required.

Performance and consistency guaranties

There is an interesting paper which showcases tradeoffs between RO and RW transactions. It postulates the SNOW theorem, which states that a RO transaction can be S, N, O or W, but only 3 goals can be achieved simultaneously.

  • Strict serializability
    • Ensures there exists a total order over all the transactions in the system, consistent with real-time ordering
  • Non-blocking operations
    • Each server can handle the operations within a read-only transaction without blocking for any external event
  • One-response from each shard
    • The one version subproperty requires that servers send only one value for each read. 
    • The one round-trip subproperty requires the client to send at most one request to each server and the server to send at most one response back
  • Write transactions
    • The ability of a read-only transaction algorithm to coexist with conflicting transactions that update data

SNOW-optimal transaction achieves 3 of 4 goals. Our RO algorithm can achieve N-O-W, depending on data staleness. Reading more recent data can trigger additional overhead on N and O properties. For N it never blocks, but can be delayed by a network RTT, for O an additional round-trip is required for write intent resolution. In the worst case N can be delayed for commit replication time.  N-O-W is achieved at a readTs <= safeTs - a timestamp, below which no active transactions exist.

For S the protocol achieved serializability. Note that RO transactions, performed at readTs, always see writes, performed by RW transactions at commitTs <= readTs.

Due to loosely synchronized clocks, RO transactions at a same absolute time will see different snapshot, in the interval defined by a clock uncertainty window.

We can improve RO transactions global visibility

  • by sacrificing the performance of RW txns by using commit wait - a RW transaction is delayed until its commit timestamp is guaranteed to pass on all cluster nodes. Bounded clock skew becomes very important in this case, because commit wait time depends on it. This approach is implemented in Spanner.
  • by restarting RO transaction at readTs if it hits the value with timestamp t, where readTs < t <= readTs + maxClockSkew

SS2PL vs S2PL

Can we use the RO protocol with S2PL ? The answer seems no.

Consider the history, which is allowed by S2PL:

H = r1(x) w1(y) w2(x) c2 c1 // r1 read lock is released before c1

Assume a RO T3 and the history

H1 = r1(x) w1(y) w2(x) c2 r3(x) r3(y) c1

This history is not serializable.

However, we can improve the concurrency by allowing RO transactions to invalidate conflicting RW transactions. Currently, the design goal is to minimize the interaction, but this can be reconsidered later.

Read-mostly large transactions

There is a special kind of transaction, which performs a lot of reads (for example full table scan) and a few writes at the end. If implemented using RW transactions, they can cause OLTP stalls. The suggested solution is to implement the reading part as RO transaction, and the writing part as RW transaction.

TBD example (referral calculation)

When to use RO transactions

RO transactions are dominating in various workloads, like WWW services. They can also be used for reporting tools and analytical processing. So, they are preferred in any places where up-to-date data is not required.

Lock-free RW transactions

The idea is very simple - a write transaction installs an update with a timestamp in the past. This update can be retrieved by a read-only transaction using local HLC. The timestamp must be less than minimum current time among all nodes, but larger than a previous committed timestamp.

The only difference with a normal rw transaction is that we need to delay the commit until all replicas receive the update. This is required for a scenario where a replica is lagging and doesn't receive log entries. This causes either 

  • the replica doesn’t see installed write (but it can be visible on other working replicas), or 
  • (possibly indefinitely) block the read-only transaction if it will try to wait until safeTs, which can never come

The downside is that we need to resolve write intents until the write transaction is completed, which can take some time on big topologies. This can be mitigated by installing write intents in the future and ignoring them until a current timestamp (HLC.now()) is not within clock uncertainty, in which case we must wait it out. This optimization can be postponed until later.

Application to metadata management

The lock-free read-write transactions can be used to atomically apply metadata.

For example, let’s consider schema changes for a table. All coordinators must see applied changes atomically, but we don’t want to use normal locking - this will create enormous contention.

We must associate a schema version at the start of the transaction. This can be done on the first table operation, for each enlisted table. At the end of a transaction the initial schema version is validated against the current version. If they are compatible (how exactly is defined by application logic), the txn can be committed, otherwise it is aborted.

This guarantees that a transaction will use the same schema for all its operations and it will not be committed if a schema is no longer up-to-date and incompatible.

Recovery

This section will cover up recovery for transactions. It describes how the txn protocol recovers from various kinds of failures: a txn coordinator failure, enlisted data node failure, majority loss for a partition, which is a particular case of data node loss.

Data node failure

First consider the simplest case: a data node failure. In this case a txn coordinator is alive, has full txn topology, and can commit or abort the transaction directly, so the transaction is never abandoned and makes progress. A single data node failure doesn’t prevent a transaction’s progress until a majority is preserved, except in the case of read lease expiration. Even if a whole partition was lost be we know that all writes are replicated, the transaction can be committed.

RW conflict avoidance

There is a special case for RW conflict. We assume read locks are neither persistent nor replicated, so its state is lost on primary replica failure. This makes it impossible to detect RW conflicts, so we need to avoid it somehow.

Consider a scenario when a read was made from a primary replica and it goes offline before a transaction commits. Additional measures are required to preserve read safety.

Because read locks are not replicated, it's possible that another transaction is mapped to a new primary replica and invalidates a read for the current not yet committed transaction.

Assume a transactions

T1: a=r1(x), w1(y), c1

T2: w2(x), c2

Due to PR1 failure and a lost read lock, a following history is possible.

H = r1(x) w2(x) c2 w1(y) c1

Assume the RO transaction T3 and the history

H2 = r1(x

On receiving TxStateReq on commit partition:

  • Read the txn state from the persistent storage. If found, validate a commit timestamp (only for committed state) and return the outcome. If not found, return NULL outcome in a TxStateResp.

Write intent resolution implies the important invariant: if a RO transaction has done a read at a timestamp T over partition P, no other active RW txn, enlisted to this partition, can commit later at a timestamp T0 <= T.

The example of a write intent resolution:

Image Removed

There is another subtle issue. Assume client 1 reads from a partition P with readTs=100 and sees an empty partition. After the read a client 2 commits a value to a partition P. Normally TX2 will use commitTs >= readTs, because the first read operation will install its HLC = readTs, so the repeated read with the same timestamp will not see the changes from client 2. But, there is a caveat. A primary replica for a partition can die before replicating the updated HLC (applied by RO read). In this case txn will remap to a new primary replica and can commit with a lower HLC if no precautions are taken. A new leaseholder must be elected before successful remap. The leaseholder disjoint invariant guarantees that any commit timestamp assigned on a new leaseholder is greater than the readTs, which is within the old lease interval.

Read timestamp cache

Each unresolved version chain can be associated with a volatile readTs, which is initially set to creation time, to avoid unnecessary resolution by the transactions in the past. This timestamp is adjusted by RO transactions. If readTs(T) < readTs(V), transaction T can skip the resolution step.

It is also possible to eagerly resolve write intents for long running RW transactions in batches, keeping the timestamp cache fresh.

Replica reads

RO transactions can be executed on non-primary replicas. write intent resolution doesn’t help because a write intent for a committed transaction may not be yet replicated to the replica. To mitigate this issue, it’s enough to run readIndex on each mapped partition leader, fetch the commit index and wait on a replica until it’s applied. This will guarantee that all required write intents are replicated and present locally. After that the normal write intern resolution should do the job.

...

All we need during RO txn is to wait until a safeTs advances past the RO txn readTs. This is illustrated in the diagram:

Image Removed

Here transactions T1 and T2 updates the different key belonging to the same partition.

OpReq(w1(x)) and OpReq(w2(x)) are received concurrently. Each write intent is assigned a timestamp in a monotonic order consistent with the replication order. This can be for example done when replication entries are dequeued for processing by replication protocol (we assume entries are replicated successively.

It’s not enough only to wait for safeTs - it may never happen due to absence of activity in the partition. Consider the next diagram:

Image Removed

We need an additional safeTsSync command to propagate a safeTs event in case there are no updates in the partition.

However, the first approach seems easier to implement ss is illustrated by the diagrao we can stick with it from the start.

The minor issue is that the safeTs advancement consumes cluster resources even in the absence of load.

Note that we can avoid safeTs completely if a txn is considered committed then write intents replicated to all replicas. 

A version lifetime

We can define a row version lifetime in the range [now - TTL, now], where TTL is a preconfigured constant, for example 10 mins. All data beyond the low watermark are considered garbage and a subject for a garbage collection.

Active RO transactions prevent a version chain from expiration until they are finished (they “pin” it). This can be implemented using a reader ref counter.

If the RO transaction is attempted to map beyond the liveness range, it is immediately aborted. A latest committed version is never removed, even if its timestamp is beyond liveness range.

It is allowed to remove intermediate row versions from a chain, if they are not in use. For example, if we have a version chain like:
ts=40 val=4

ts=30 val=3

ts=20 val=2

ts=10 val=1

and a low watermark is reached 30, we can remove versions with ts=30 and ts=20 even if a RO tx in progress at readTs=10

TBD epochs for GC

Remapping scans on unstable topology

Scans performed under RO transactions can be remapped if a mapped node has failed. For this to work, a table must track the latest scanned value for each mapped node. In case of failure, the local node iterator can be reinstated from this point on another live replica. This can help to minimize RO transactions restarts on unstable topology.

Single key txn

Such a transaction always returns the latest committed version. No locks are required.

Performance and consistency guaranties

There is an interesting paper which showcases tradeoffs between RO and RW transactions. It postulates the SNOW theorem, which states that a RO transaction can be S, N, O or W, but only 3 goals can be achieved simultaneously.

  • Strict serializability
    • Ensures there exists a total order over all the transactions in the system, consistent with real-time ordering
  • Non-blocking operations
    • Each server can handle the operations within a read-only transaction without blocking for any external event
  • One-response from each shard
    • The one version subproperty requires that servers send only one value for each read. 
    • The one round-trip subproperty requires the client to send at most one request to each server and the server to send at most one response back
  • Write transactions
    • The ability of a read-only transaction algorithm to coexist with conflicting transactions that update data

SNOW-optimal transaction achieves 3 of 4 goals. Our RO algorithm can achieve N-O-W, depending on data staleness. Reading more recent data can trigger additional overhead on N and O properties. For N it never blocks, but can be delayed by a network RTT, for O an additional round-trip is required for write intent resolution. In the worst case N can be delayed for commit replication time.  N-O-W is achieved at a readTs <= safeTs - a timestamp, below which no active transactions exist.

For S the protocol achieved serializability. Note that RO transactions, performed at readTs, always see writes, performed by RW transactions at commitTs <= readTs.

Due to loosely synchronized clocks, RO transactions at a same absolute time will see different snapshot, in the interval defined by a clock uncertainty window.

We can improve RO transactions global visibility

  • by sacrificing the performance of RW txns by using commit wait - a RW transaction is delayed until its commit timestamp is guaranteed to pass on all cluster nodes. Bounded clock skew becomes very important in this case, because commit wait time depends on it. This approach is implemented in Spanner.
  • by restarting RO transaction at readTs if it hits the value with timestamp t, where readTs < t <= readTs + maxClockSkew

SS2PL vs S2PL

Can we use the RO protocol with S2PL ? The answer seems no.

Consider the history, which is allowed by S2PL:

H = r1(x) w1(y) w2(x) c2 c1 // r1 read lock is released before c1

Assume a RO T3 and the history

H1 = r1(x) w1(y) w2(x) c2 r3(x) r3(y) w1(y) c1

This history is not serializable.

However, we can improve the concurrency by allowing RO transactions to invalidate conflicting RW transactions. Currently, the design goal is to minimize the interaction, but this can be reconsidered later.

Read-mostly large transactions

There is a special kind of transaction, which performs a lot of reads (for example full table scan) and a few writes at the end. If implemented using RW transactions, they can cause OLTP stalls. The suggested solution is to implement the reading part as RO transaction, and the writing part as RW transaction.

TBD example (referral calculation)

When to use RO transactions

RO transactions are dominating in various workloads, like WWW services. They can also be used for reporting tools and analytical processing. So, they are preferred in any places where up-to-date data is not required.

Lock-free RW transactions

The idea is very simple - a write transaction installs an update with a timestamp in the past. This update can be retrieved by a read-only transaction using local HLC. The timestamp must be less than minimum current time among all nodes, but larger than a previous committed timestamp.

The only difference with a normal rw transaction is that we need to delay the commit until all replicas receive the update. This is required for a scenario where a replica is lagging and doesn't receive log entries. This causes either 

  • the replica doesn’t see installed write (but it can be visible on other working replicas), or 
  • (possibly indefinitely) block the read-only transaction if it will try to wait until safeTs, which can never come

The downside is that we need to resolve write intents until the write transaction is completed, which can take some time on big topologies. This can be mitigated by installing write intents in the future and ignoring them until a current timestamp (HLC.now()) is not within clock uncertainty, in which case we must wait it out. This optimization can be postponed until later.

Application to metadata management

The lock-free read-write transactions can be used to atomically apply metadata.

For example, let’s consider schema changes for a table. All coordinators must see applied changes atomically, but we don’t want to use normal locking - this will create enormous contention.

We must associate a schema version at the start of the transaction. This can be done on the first table operation, for each enlisted table. At the end of a transaction the initial schema version is validated against the current version. If they are compatible (how exactly is defined by application logic), the txn can be committed, otherwise it is aborted.

This guarantees that a transaction will use the same schema for all its operations and it will not be committed if a schema is no longer up-to-date and incompatible.

Recovery

This section will cover up recovery for transactions. It describes how the txn protocol recovers from various kinds of failures: a txn coordinator failure, enlisted data node failure, majority loss for a partition, which is a particular case of data node loss.

Data node failure

First consider the simplest case: a data node failure. In this case a txn coordinator is alive, has full txn topology, and can commit or abort the transaction directly, so the transaction is never abandoned and makes progress. A single data node failure doesn’t prevent a transaction’s progress until a majority is preserved, except in the case of read lease expiration. Even if a whole partition was lost be we know that all writes are replicated, the transaction can be committed.

RW conflict avoidance

There is a special case for RW conflict. We assume read locks are neither persistent nor replicated, so its state is lost on primary replica failure. This makes it impossible to detect RW conflicts, so we need to avoid it somehow.

Consider a scenario when a read was made from a primary replica and it goes offline before a transaction commits. Additional measures are required to preserve read safety.

Because read locks are not replicated, it's possible that another transaction is mapped to a new primary replica and invalidates a read for the current not yet committed transaction.

Assume a transactions

T1: a=r1(x), w1(y), c1

T2: w2(x), c2

Due to PR1 failure and a lost read lock, a following history is possible.

H = r1(x) w2(x) c2 w1(y) c1

Assume the RO transaction T3 and the history

H2 = r1(x) w2(x) c2 r3(x) r3(y) w1(y) c1

The RO transaction will return the inconsistent result to a user, not corresponding to any serial execution of T1, T2, T3

This is illustrated by the diagram:

Image Removed

Note that H2 is not allowed by SS2PL, instead it allows

H3 = r1(x) w1(y) c1 w2(x) c2

We prevent this history by not committing TX1 if its commit timestamp lies outside lease ranges.

To achieve this, we need to periodically refresh the lease interval for the read primary replicas if a transaction runs too long.

In the following diagram the lease refresh discovers that the lease is no longer valid, and the transaction is aborted.

Image Removed

Leases can be refreshed periodically by TxnCoordinator, depending on the lease duration. Successful refreshing of a lease extends transaction deadline up to a lease upper bound.

The more frequent refreshing reduces the probability of txn abort due to reaching the deadline.

The reasonable refresh rate can be calculated as a fraction of the lease duration D, for example  D/3.

Commit reordering for dependent transactions

Another issue arises due to absence of worst case time guarantees on the replication to majority. Consider the picture:

Image Removed

This picture shows the possible commit reordering between T1 and T2 due to delay in applying the commit for T1, despite the fact T2 has a larger commit timestamp. This leads to a situation when T2 becomes visible before T1. But it seems not to cause any consistency issues, because up-to-date data is correct and the possible issues can only be with RO transactions at a timestamp T2. Likely, we already have the intent written for T1 and the write intent resolution procedure will eventually wait for committed intent. So, for now I’m considering this reordering harmless.

Coordinator failure

This means a RW transaction becomes abandoned, if it’s not started to commit. All locks and write intents created by the transaction will stay forever, if no additional measures are taken.

The one way to resolve such txns state is to use discovery service to detect coordinator failure, search locally all txn ids initiated by the failed node, group them by the commit partition node and send TxnOutcomeRequest to each primary replica, containing a list of corresponding txIds to check. The commit partition group is the ultimate source of the information about txn finish state. There are several check outcomes.

...

The RO transaction will return the inconsistent result to a user, not corresponding to any serial execution of T1, T2, T3

This is illustrated by the diagram:

Image Added

Note that H2 is not allowed by SS2PL, instead it allows

H3 = r1(x) w1(y) c1 w2(x) c2

We prevent this history by not committing TX1 if its commit timestamp lies outside lease ranges.

To achieve this, we need to periodically refresh the lease interval for the read primary replicas if a transaction runs too long.

In the following diagram the lease refresh discovers that the lease is no longer valid, and the transaction is aborted.

Image Added

Leases can be refreshed periodically by TxnCoordinator, depending on the lease duration. Successful refreshing of a lease extends transaction deadline up to a lease upper bound.

The more frequent refreshing reduces the probability of txn abort due to reaching the deadline.

The reasonable refresh rate can be calculated as a fraction of the lease duration D, for example  D/3.

Commit reordering for dependent transactions

Another issue arises due to absence of worst case time guarantees on the replication to majority. Consider the picture:

Image Added

This picture shows the possible commit reordering between T1 and T2 due to delay in applying the commit for T1, despite the fact T2 has a larger commit timestamp. This leads to a situation when T2 becomes visible before T1. But it seems not to cause any consistency issues, because up-to-date data is correct and the possible issues can only be with RO transactions at a timestamp T2. Likely, we already have the intent written for T1 and the write intent resolution procedure will eventually wait for committed intent. So, for now I’m considering this reordering harmless.

Coordinator failure

This means a RW transaction becomes abandoned, if it’s not started to commit. All locks and write intents created by the transaction will stay forever, if no additional measures are taken.

The one way to resolve such txns state is to use discovery service to detect coordinator failure, search locally all txn ids initiated by the failed node, group them by the commit partition node and send TxnOutcomeRequest to each primary replica, containing a list of corresponding txIds to check. The commit partition group is the ultimate source of the information about txn finish state. There are several check outcomes.

  1. txId is committed or aborted.
    This means the txn has been finished but CleanupRequest is not yet received. In this case nothing to do, because a txn will eventually be finished by the commit partition group.
  2. txId state is not found.
    This means txn is active and most likely the coordinator has died in between a txn. The resolution here is to abort the transaction by writing a corresponding state to a commit partition. It’s possible the commit request is in flight right now, so we need to atomically set txn status to aborted during the check to prevent the post-check commit.
  3. Commit partition group nodes have lost the majority. In this case the state of txn is unknown. We need to wait until the majority is restored and until this happens all locks are remaining held. We should mark such transactions as “hanging”. All subsequent transactions which are trying to take a lock and see the locker tx in the hanging state are immediately failed with a message like: “Failed to take a lock because the previous locker state is unknown ” + tx info.

    For in-memory table this means data corruption - we can immediately resolve to commit or abort in this case, doesn’t matter.

RO transactions are handled differently. They rely on a write intent resolution to make progress. This process is described in detail in Reading the data. But it’s possible that both paths are impossible due to failed or unresponsive nodes. If by some reason a write intent resolution is not possible (or timed out), the RO txn must be failed with a corresponding exception to avoid the risk of reading inconsistent data.

Handling node restarts

On a restart a volatile node state is lost. No special handling is required for unresolved write intents on node restarts. If they are accessed in this state, the transaction will try to resolve its state via the commit partition (because local txn map doesn't contain the coordinator), as described in the RW algorithm section. If a commit partition doesn’t contain the txn state, this means a transaction is abandoned and must be aborted. A transaction is added to abandoned list and is processed as described in the Coordinator Failure section. As soon as aborted state is written, a current txn can proceed.

The edge case for this scenario is a full cluster restart under the load.

Commit partition group primary replica failure

We have the cleanup phase as a part of the commit in the current protocol. For this purpose a list of enlisted partitions is passed together with a finish request from a txn coordinator to a commit partition group. A commit includes the part then a cleanup request is sent to all enlisted partitions to unlock locks and commit uncommitted versions.

But it’s possible that a commit partition group primary replica fails in the middle of a process.

To avoid the situation, we must ensure that until a majority is preserved, all enlisted partitions eventually will receive a message.

We can use a simple protocol backed by consensus to achieve this, previously called a durable cleanup. We use two raft commands for this purpose:

FinishCommand (TxState txtate, Map<PartId,Boolean> finishState);

where finishState holds the true if a corresponding partition group acked the unlocking, false - otherwise (default value)

As soon as FinishCommand is committed to a group, a background process periodically sends CleanupRequest to enlisted partitions. Each cleanup request is processed by a partition group and acks the corresponding map entry by sending CleanupAckRequest. If a primary replica fails - this is not a problem, because CleanupRequests periodically are being resent. As soon as all partitions are acked, the background process stops to send unlock requests. Unlock requests are idempotent, so sending many requests shouldn’t be a problem.

Commit partition group majority loss

This is a reason for unavailability. Some transactions can’t be finished until the majority is restored.

Components hierarchy for CC

Below is the list of components that are required for CC execution. Each component contains a minimal set of required methods.

MVStore

Implements methods for accessing multi-version data under txn context. One per data table partition. Contains all primary and secondary indexes.

Code Block
languagejava
// Puts a row. This call updates primary and all secondary indexes after acquiring necessary locks.
CompletableFuture<Void> put(Tuple row, UUID txId); 

// Removes a row. This call updates primary and all secondary indexes after acquiring necessary locks.
CompletableFuture<Tuple> remove(Tuple keyTuple, UUID txId);

// Executes the query in RW mode.
AsyncCursor<Tuple> query(@Nullable Query query, UUID txId);

// Executes the query in RO mode.
Cursor<Tuple> query(@Nullable Query query, Timestamp readTs);

// Invokes a closure over the set of records matching the filter. Returns a number of modified rows.
CompletableFuture<Long> invoke(InvokeClosure closure, @Nullable Filter filter, UUID txId);

// Commits a transaction with a timestamp.
CompletableFuture<Boolean> commit(UUID txId, Timestamp commitTs);

// Aborts a transaction
CompletableFuture<Boolean> abort(UUID txId);


Query implements the data retrieval logic and can use indexes. Possible types are:

  • ScanQuery
  • RangeQuery
    @Nullable Tuple keyLower();
    boolean lowerInclusive();
    @Nullable Tuple keyUpper();
    boolean upperInclusive();
  • EqQuery
    Tuple key();
  • InQuery
    List<Tuple> keys();
  • etc ...

The returned cursor must support async execution, because scans can acquire locks in RW mode.

Not all indexes can support all filter types. TBD: define new index types, like BloomFilter, etc.

The proposed operations should support batches.

LockManager 

Used to manage locks (easily guessed). It has a volatile state, which is lost on restart. One per table. Used by MVStore to acquire/release locks as required by CC protocol.

LockManager provides the following operations:

  • CompletableFutute<LockDesc> acquire(UUID txId, byte[] lockName, LockMode mode); // Acquires (or upgrades) a lock and returns a future with a lock description
  • void release(Lock lock); // Release previously acquired lock
  • void downgrade(Lock lock, LockMode mode); // Downgrades a lock
  • Iterator<Lock> locks(UUID txId); // Return all locks acquired by a transaction.

TxnStateManager

Holds a transaction state. Currently two states are supported: COMMITTED and ABORTED. One per data node. Used by MVStore to resolve write intents and for txn recovery purposes.

TxnStateManager provides the following operations:

  • CompletableFuture<TxState> state(UUID txId);
  • CompletableFuture<Boolean> changeState(UUID txId, TxState beforeState, TxState afterState); // Atomically change txn state

This state is replicated using the ReplicationManager for a partition, calculated by a transaction id. Uses a separate TxnStateStore for holding txn state.

TxnCoordinator 

Coordinates a transaction execution. Can be deployed on any cluster node, not necessary data node. One per node. Holds a map of active transactions UUID→InternalTransaction, assuming a txId is implemented using UUID class.

Code Block
languagejava
// Assign RW transaction id.
CompletableFuture<UUID> beginAsync();

// Commit the transaction
CompletableFuture<Boolean> commitAsync(UUID txId);

// Rollback the transaction
CompletableFuture<Boolean> rollbackAsync(UUID txId);

There is no need to explicitly start the RO transaction. A user must pass a timestamp instead of txId to execute a RO transaction.

InternalTable 

Internal facade for transactional record storage. Can be deployed on any cluster node, not necessary data node. Caches the information about table partitions topology and last known table schema. One per node. Executes transactional ops by sending the requests to enlisted partition data nodes, defined by the calculated data affinity.

The set of methods is similar to MVStore. Additionally, snapshot scans are allowed to be executed on an exact node by passing a custom partition map:

Code Block
languagejava
Cursor<Tuple> scan(Tuple keyLower, boolean lowerInclusive, Tuple keyUpper, boolean upperInclusive, 
@Nullable Timestamp ts, @Nullable Map<PartitionId, ClusterNode> partMap);

// Other methods

Snapshot scan is executed on backups out of the box and, if the parMap is not null, uses this map to define the exact partition to node mapping.

ReplicationManager

Provides replication capabilities to MVStore. The actual implementation can use any replication protocol, for example, RAFT. One per replicated partition. Delegates to MVStore to execute “the applyphase of a replication.

Code Block
languagejava
CompletableFuture<Peer> getOrWaitLeader();

@Nullable Peer leader();

isLocal(Peer peer);

List<Peer> peers();

List<Peer> learners();

<T> CompletableFuture<T> apply(Command command);

boolean isLeaderLeaseValid(HLC ts);

ReplicatedMVStore

Extends the MVStore functionality with replication support and leases validity for reads. Uses ReplicationManager for execution of replication related logic

...

RO transactions are handled differently. They rely on a write intent resolution to make progress. This process is described in detail in Reading the data. But it’s possible that both paths are impossible due to failed or unresponsive nodes. If by some reason a write intent resolution is not possible (or timed out), the RO txn must be failed with a corresponding exception to avoid the risk of reading inconsistent data.

Handling node restarts

On a restart a volatile node state is lost. No special handling is required for unresolved write intents on node restarts. If they are accessed in this state, the transaction will try to resolve its state via the commit partition (because local txn map doesn't contain the coordinator), as described in the RW algorithm section. If a commit partition doesn’t contain the txn state, this means a transaction is abandoned and must be aborted. A transaction is added to abandoned list and is processed as described in the Coordinator Failure section. As soon as aborted state is written, a current txn can proceed.

The edge case for this scenario is a full cluster restart under the load.

Commit partition group primary replica failure

We have the cleanup phase as a part of the commit in the current protocol. For this purpose a list of enlisted partitions is passed together with a finish request from a txn coordinator to a commit partition group. A commit includes the part then a cleanup request is sent to all enlisted partitions to unlock locks and commit uncommitted versions.

But it’s possible that a commit partition group primary replica fails in the middle of a process.

To avoid the situation, we must ensure that until a majority is preserved, all enlisted partitions eventually will receive a message.

We can use a simple protocol backed by consensus to achieve this, previously called a durable cleanup. We use two raft commands for this purpose:

FinishCommand (TxState txtate, Map<PartId,Boolean> finishState);

where finishState holds the true if a corresponding partition group acked the unlocking, false - otherwise (default value)

As soon as FinishCommand is committed to a group, a background process periodically sends CleanupRequest to enlisted partitions. Each cleanup request is processed by a partition group and acks the corresponding map entry by sending CleanupAckRequest. If a primary replica fails - this is not a problem, because CleanupRequests periodically are being resent. As soon as all partitions are acked, the background process stops to send unlock requests. Unlock requests are idempotent, so sending many requests shouldn’t be a problem.

Commit partition group majority loss

This is a reason for unavailability. Some transactions can’t be finished until the majority is restored.

Clock requirements

The requirements for correct snapshot is:

...