...
Each transaction is assigned a deadline (by setting an explicit or default timeout), after which a commit is not possible. A txn coordinator watches for deadlines and preliminary aborts convicts.
Below is the list of components that are required for CC execution. Each component contains a minimal set of required methods.
Implements methods for accessing multi-version data under txn context. One per data table partition. Contains all primary and secondary indexes.
Code Block | ||
---|---|---|
| ||
// Puts a row. This call updates primary and all secondary indexes after acquiring necessary locks.
CompletableFuture<Void> put(Tuple row, UUID txId);
// Removes a row. This call updates primary and all secondary indexes after acquiring necessary locks.
CompletableFuture<Tuple> remove(Tuple keyTuple, UUID txId);
// Executes the query in RW mode.
AsyncCursor<Tuple> query(@Nullable Query query, UUID txId);
// Executes the query in RO mode.
Cursor<Tuple> query(@Nullable Query query, Timestamp readTs);
// Invokes a closure over the set of records matching the filter. Returns a number of modified rows.
CompletableFuture<Long> invoke(InvokeClosure closure, @Nullable Filter filter, UUID txId);
// Commits a transaction with a timestamp.
CompletableFuture<Boolean> commit(UUID txId, Timestamp commitTs);
// Aborts a transaction
CompletableFuture<Boolean> abort(UUID txId); |
Query implements the data retrieval logic and can use indexes. Possible types are:
The returned cursor must support async execution, because scans can acquire locks in RW mode.
Not all indexes can support all filter types. TBD: define new index types, like BloomFilter, etc.
The proposed operations should support batches.
Used to manage locks (easily guessed). It has a volatile state, which is lost on restart. One per table. Used by MVStore to acquire/release locks as required by CC protocol.
LockManager provides the following operations:
Holds a transaction state. Currently two states are supported: COMMITTED and ABORTED. One per data node. Used by MVStore to resolve write intents and for txn recovery purposes.
TxnStateManager provides the following operations:
This state is replicated using the ReplicationManager for a partition, calculated by a transaction id. Uses a separate TxnStateStore for holding txn state.
Coordinates a transaction execution. Can be deployed on any cluster node, not necessary data node. One per node. Holds a map of active transactions UUID→InternalTransaction, assuming a txId is implemented using UUID class.
Code Block | ||
---|---|---|
| ||
// Assign RW transaction id.
CompletableFuture<UUID> beginAsync();
// Commit the transaction
CompletableFuture<Boolean> commitAsync(UUID txId);
// Rollback the transaction
CompletableFuture<Boolean> rollbackAsync(UUID txId); |
There is no need to explicitly start the RO transaction. A user must pass a timestamp instead of txId to execute a RO transaction.
Internal facade for transactional record storage. Can be deployed on any cluster node, not necessary data node. Caches the information about table partitions topology and last known table schema. One per node. Executes transactional ops by sending the requests to enlisted partition data nodes, defined by the calculated data affinity.
The set of methods is similar to MVStore. Additionally, snapshot scans are allowed to be executed on an exact node by passing a custom partition map:
Code Block | ||
---|---|---|
| ||
Cursor<Tuple> scan(Tuple keyLower, boolean lowerInclusive, Tuple keyUpper, boolean upperInclusive,
@Nullable Timestamp ts, @Nullable Map<PartitionId, ClusterNode> partMap);
// Other methods |
Snapshot scan is executed on backups out of the box and, if the parMap is not null, uses this map to define the exact partition to node mapping.
Provides replication capabilities to MVStore. The actual implementation can use any replication protocol, for example, RAFT. One per replicated partition. Delegates to MVStore to execute “the apply” phase of a replication.
Code Block | ||
---|---|---|
| ||
CompletableFuture<Peer> getOrWaitLeader();
@Nullable Peer leader();
isLocal(Peer peer);
List<Peer> peers();
List<Peer> learners();
<T> CompletableFuture<T> apply(Command command);
boolean isLeaderLeaseValid(HLC ts); |
Extends the MVStore functionality with replication support and leases validity for reads. Uses ReplicationManager for execution of replication related logic.
RW transactions can read from backup replicas, assuming a proper read lock is acquired on a primary replica and an entry is replicated. A lock response will install locally the proper HLC for waiting. See replica reads section on how to ensure that. Beware: this might incur blocking if a local replica is lagging.
Here are scenarios in which you should use a locking read-write transaction:
An RO transaction is associated with a read timestamp at the beginning. It scans indexes to get access to table data via indexed rowIds. Because it passes normal locking, rowIds must be somehow filtered according to their visibility. This filtering process is known as a write intent resolution and is crucial for RO transactions processing. Write intent resolution is required then a version chain corresponding to rowId has an uncommitted version.
We expect RO txn serializability. The reading txn can “see” the write intent only if it's committed and the commit timestamp <= read timestamp. A repeating RO txn at the timestamp T must always return the same result set.
In a nutshell, the write intent resolution process ensures the aforementioned invariant. The read-only transaction performs a resolve(RowId rowId, Timestamp readTs) operation for each accessed rowId, which includes the following steps:
On receiving TxStateReq on the coordinator:
...
RW transactions can read from backup replicas, assuming a proper read lock is acquired on a primary replica and an entry is replicated. A lock response will install locally the proper HLC for waiting. See replica reads section on how to ensure that. Beware: this might incur blocking if a local replica is lagging.
Here are scenarios in which you should use a locking read-write transaction:
An RO transaction is associated with a read timestamp at the beginning. It scans indexes to get access to table data via indexed rowIds. Because it passes normal locking, rowIds must be somehow filtered according to their visibility. This filtering process is known as a write intent resolution and is crucial for RO transactions processing. Write intent resolution is required then a version chain corresponding to rowId has an uncommitted version.
We expect RO txn serializability. The reading txn can “see” the write intent only if it's committed and the commit timestamp <= read timestamp. A repeating RO txn at the timestamp T must always return the same result set.
In a nutshell, the write intent resolution process ensures the aforementioned invariant. The read-only transaction performs a resolve(RowId rowId, Timestamp readTs) operation for each accessed rowId, which includes the following steps:
On receiving TxStateReq on the coordinator:
On receiving TxStateReq on commit partition:
Write intent resolution implies the important invariant: if a RO transaction has done a read at a timestamp T over partition P, no other active RW txn, enlisted to this partition, can commit later at a timestamp T0 <= T.
The example of a write intent resolution:
There is another subtle issue. Assume client 1 reads from a partition P with readTs=100 and sees an empty partition. After the read a client 2 commits a value to a partition P. Normally TX2 will use commitTs >= readTs, because the first read operation will install its HLC = readTs, so the repeated read with the same timestamp will not see the changes from client 2. But, there is a caveat. A primary replica for a partition can die before replicating the updated HLC (applied by RO read). In this case txn will remap to a new primary replica and can commit with a lower HLC if no precautions are taken. A new leaseholder must be elected before successful remap. The leaseholder disjoint invariant guarantees that any commit timestamp assigned on a new leaseholder is greater than the readTs, which is within the old lease interval.
Each unresolved version chain can be associated with a volatile readTs, which is initially set to creation time, to avoid unnecessary resolution by the transactions in the past. This timestamp is adjusted by RO transactions. If readTs(T) < readTs(V), transaction T can skip the resolution step.
It is also possible to eagerly resolve write intents for long running RW transactions in batches, keeping the timestamp cache fresh.
RO transactions can be executed on non-primary replicas. write intent resolution doesn’t help because a write intent for a committed transaction may not be yet replicated to the replica. To mitigate this issue, it’s enough to run readIndex on each mapped partition leader, fetch the commit index and wait on a replica until it’s applied. This will guarantee that all required write intents are replicated and present locally. After that the normal write intern resolution should do the job.
There is a second option, which doesn’t require the network RTT. We can use a special low watermark timestamp (safeTs) per replication group, which corresponds to the apply index of a replicated entry, so then an apply index is advanced during the replication, then the safeTs is monotonically incremented too. The HLC used for safeTs advancing is assigned to a replicated entry in an ordered way.
Special measures are needed to periodically advance the safeTs if no updates are happening. It’s enough to use a special replication command for this purpose.
All we need during RO txn is to wait until a safeTs advances past the RO txn readTs. This is illustrated in the diagram:
Here transactions T1 and T2 updates the different key belonging to the same partition.
OpReq(w1(x)) and OpReq(w2(x)) are received concurrently. Each write intent is assigned a timestamp in a monotonic order consistent with the replication order. This can be for example done when replication entries are dequeued for processing by replication protocol (we assume entries are replicated successively.
It’s not enough only to wait for safeTs - it may never happen due to absence of activity in the partition. Consider the next diagram:
We need an additional safeTsSync command to propagate a safeTs event in case there are no updates in the partition.
However, the first approach seems easier to implement ss is illustrated by the diagrao we can stick with it from the start.
The minor issue is that the safeTs advancement consumes cluster resources even in the absence of load.
Note that we can avoid safeTs completely if a txn is considered committed then write intents replicated to all replicas.
We can define a row version lifetime in the range [now - TTL, now], where TTL is a preconfigured constant, for example 10 mins. All data beyond the low watermark are considered garbage and a subject for a garbage collection.
Active RO transactions prevent a version chain from expiration until they are finished (they “pin” it). This can be implemented using a reader ref counter.
If the RO transaction is attempted to map beyond the liveness range, it is immediately aborted. A latest committed version is never removed, even if its timestamp is beyond liveness range.
It is allowed to remove intermediate row versions from a chain, if they are not in use. For example, if we have a version chain like:
ts=40 val=4
ts=30 val=3
ts=20 val=2
ts=10 val=1
and a low watermark is reached 30, we can remove versions with ts=30 and ts=20 even if a RO tx in progress at readTs=10
TBD epochs for GC
Scans performed under RO transactions can be remapped if a mapped node has failed. For this to work, a table must track the latest scanned value for each mapped node. In case of failure, the local node iterator can be reinstated from this point on another live replica. This can help to minimize RO transactions restarts on unstable topology.
Such a transaction always returns the latest committed version. No locks are required.
There is an interesting paper which showcases tradeoffs between RO and RW transactions. It postulates the SNOW theorem, which states that a RO transaction can be S, N, O or W, but only 3 goals can be achieved simultaneously.
SNOW-optimal transaction achieves 3 of 4 goals. Our RO algorithm can achieve N-O-W, depending on data staleness. Reading more recent data can trigger additional overhead on N and O properties. For N it never blocks, but can be delayed by a network RTT, for O an additional round-trip is required for write intent resolution. In the worst case N can be delayed for commit replication time. N-O-W is achieved at a readTs <= safeTs - a timestamp, below which no active transactions exist.
For S the protocol achieved serializability. Note that RO transactions, performed at readTs, always see writes, performed by RW transactions at commitTs <= readTs.
Due to loosely synchronized clocks, RO transactions at a same absolute time will see different snapshot, in the interval defined by a clock uncertainty window.
We can improve RO transactions global visibility
Can we use the RO protocol with S2PL ? The answer seems no.
Consider the history, which is allowed by S2PL:
H = r1(x) w1(y) w2(x) c2 c1 // r1 read lock is released before c1
Assume a RO T3 and the history
H1 = r1(x) w1(y) w2(x) c2 r3(x) r3(y) c1
This history is not serializable.
However, we can improve the concurrency by allowing RO transactions to invalidate conflicting RW transactions. Currently, the design goal is to minimize the interaction, but this can be reconsidered later.
There is a special kind of transaction, which performs a lot of reads (for example full table scan) and a few writes at the end. If implemented using RW transactions, they can cause OLTP stalls. The suggested solution is to implement the reading part as RO transaction, and the writing part as RW transaction.
TBD example (referral calculation)
RO transactions are dominating in various workloads, like WWW services. They can also be used for reporting tools and analytical processing. So, they are preferred in any places where up-to-date data is not required.
The idea is very simple - a write transaction installs an update with a timestamp in the past. This update can be retrieved by a read-only transaction using local HLC. The timestamp must be less than minimum current time among all nodes, but larger than a previous committed timestamp.
The only difference with a normal rw transaction is that we need to delay the commit until all replicas receive the update. This is required for a scenario where a replica is lagging and doesn't receive log entries. This causes either
The downside is that we need to resolve write intents until the write transaction is completed, which can take some time on big topologies. This can be mitigated by installing write intents in the future and ignoring them until a current timestamp (HLC.now()) is not within clock uncertainty, in which case we must wait it out. This optimization can be postponed until later.
The lock-free read-write transactions can be used to atomically apply metadata.
For example, let’s consider schema changes for a table. All coordinators must see applied changes atomically, but we don’t want to use normal locking - this will create enormous contention.
We must associate a schema version at the start of the transaction. This can be done on the first table operation, for each enlisted table. At the end of a transaction the initial schema version is validated against the current version. If they are compatible (how exactly is defined by application logic), the txn can be committed, otherwise it is aborted.
This guarantees that a transaction will use the same schema for all its operations and it will not be committed if a schema is no longer up-to-date and incompatible.
This section will cover up recovery for transactions. It describes how the txn protocol recovers from various kinds of failures: a txn coordinator failure, enlisted data node failure, majority loss for a partition, which is a particular case of data node loss.
First consider the simplest case: a data node failure. In this case a txn coordinator is alive, has full txn topology, and can commit or abort the transaction directly, so the transaction is never abandoned and makes progress. A single data node failure doesn’t prevent a transaction’s progress until a majority is preserved, except in the case of read lease expiration. Even if a whole partition was lost be we know that all writes are replicated, the transaction can be committed.
There is a special case for RW conflict. We assume read locks are neither persistent nor replicated, so its state is lost on primary replica failure. This makes it impossible to detect RW conflicts, so we need to avoid it somehow.
Consider a scenario when a read was made from a primary replica and it goes offline before a transaction commits. Additional measures are required to preserve read safety.
Because read locks are not replicated, it's possible that another transaction is mapped to a new primary replica and invalidates a read for the current not yet committed transaction.
Assume a transactions
T1: a=r1(x), w1(y), c1
T2: w2(x), c2
Due to PR1 failure and a lost read lock, a following history is possible.
H = r1(x) w2(x) c2 w1(y) c1
Assume the RO transaction T3 and the history
H2 = r1(x
On receiving TxStateReq on commit partition:
Write intent resolution implies the important invariant: if a RO transaction has done a read at a timestamp T over partition P, no other active RW txn, enlisted to this partition, can commit later at a timestamp T0 <= T.
The example of a write intent resolution:
There is another subtle issue. Assume client 1 reads from a partition P with readTs=100 and sees an empty partition. After the read a client 2 commits a value to a partition P. Normally TX2 will use commitTs >= readTs, because the first read operation will install its HLC = readTs, so the repeated read with the same timestamp will not see the changes from client 2. But, there is a caveat. A primary replica for a partition can die before replicating the updated HLC (applied by RO read). In this case txn will remap to a new primary replica and can commit with a lower HLC if no precautions are taken. A new leaseholder must be elected before successful remap. The leaseholder disjoint invariant guarantees that any commit timestamp assigned on a new leaseholder is greater than the readTs, which is within the old lease interval.
Each unresolved version chain can be associated with a volatile readTs, which is initially set to creation time, to avoid unnecessary resolution by the transactions in the past. This timestamp is adjusted by RO transactions. If readTs(T) < readTs(V), transaction T can skip the resolution step.
It is also possible to eagerly resolve write intents for long running RW transactions in batches, keeping the timestamp cache fresh.
RO transactions can be executed on non-primary replicas. write intent resolution doesn’t help because a write intent for a committed transaction may not be yet replicated to the replica. To mitigate this issue, it’s enough to run readIndex on each mapped partition leader, fetch the commit index and wait on a replica until it’s applied. This will guarantee that all required write intents are replicated and present locally. After that the normal write intern resolution should do the job.
...
All we need during RO txn is to wait until a safeTs advances past the RO txn readTs. This is illustrated in the diagram:
Here transactions T1 and T2 updates the different key belonging to the same partition.
OpReq(w1(x)) and OpReq(w2(x)) are received concurrently. Each write intent is assigned a timestamp in a monotonic order consistent with the replication order. This can be for example done when replication entries are dequeued for processing by replication protocol (we assume entries are replicated successively.
It’s not enough only to wait for safeTs - it may never happen due to absence of activity in the partition. Consider the next diagram:
We need an additional safeTsSync command to propagate a safeTs event in case there are no updates in the partition.
However, the first approach seems easier to implement ss is illustrated by the diagrao we can stick with it from the start.
The minor issue is that the safeTs advancement consumes cluster resources even in the absence of load.
Note that we can avoid safeTs completely if a txn is considered committed then write intents replicated to all replicas.
We can define a row version lifetime in the range [now - TTL, now], where TTL is a preconfigured constant, for example 10 mins. All data beyond the low watermark are considered garbage and a subject for a garbage collection.
Active RO transactions prevent a version chain from expiration until they are finished (they “pin” it). This can be implemented using a reader ref counter.
If the RO transaction is attempted to map beyond the liveness range, it is immediately aborted. A latest committed version is never removed, even if its timestamp is beyond liveness range.
It is allowed to remove intermediate row versions from a chain, if they are not in use. For example, if we have a version chain like:
ts=40 val=4
ts=30 val=3
ts=20 val=2
ts=10 val=1
and a low watermark is reached 30, we can remove versions with ts=30 and ts=20 even if a RO tx in progress at readTs=10
TBD epochs for GC
Scans performed under RO transactions can be remapped if a mapped node has failed. For this to work, a table must track the latest scanned value for each mapped node. In case of failure, the local node iterator can be reinstated from this point on another live replica. This can help to minimize RO transactions restarts on unstable topology.
Such a transaction always returns the latest committed version. No locks are required.
There is an interesting paper which showcases tradeoffs between RO and RW transactions. It postulates the SNOW theorem, which states that a RO transaction can be S, N, O or W, but only 3 goals can be achieved simultaneously.
SNOW-optimal transaction achieves 3 of 4 goals. Our RO algorithm can achieve N-O-W, depending on data staleness. Reading more recent data can trigger additional overhead on N and O properties. For N it never blocks, but can be delayed by a network RTT, for O an additional round-trip is required for write intent resolution. In the worst case N can be delayed for commit replication time. N-O-W is achieved at a readTs <= safeTs - a timestamp, below which no active transactions exist.
For S the protocol achieved serializability. Note that RO transactions, performed at readTs, always see writes, performed by RW transactions at commitTs <= readTs.
Due to loosely synchronized clocks, RO transactions at a same absolute time will see different snapshot, in the interval defined by a clock uncertainty window.
We can improve RO transactions global visibility
Can we use the RO protocol with S2PL ? The answer seems no.
Consider the history, which is allowed by S2PL:
H = r1(x) w1(y) w2(x) c2 c1 // r1 read lock is released before c1
Assume a RO T3 and the history
H1 = r1(x) w1(y) w2(x) c2 r3(x) r3(y) w1(y) c1
This history is not serializable.
However, we can improve the concurrency by allowing RO transactions to invalidate conflicting RW transactions. Currently, the design goal is to minimize the interaction, but this can be reconsidered later.
There is a special kind of transaction, which performs a lot of reads (for example full table scan) and a few writes at the end. If implemented using RW transactions, they can cause OLTP stalls. The suggested solution is to implement the reading part as RO transaction, and the writing part as RW transaction.
TBD example (referral calculation)
RO transactions are dominating in various workloads, like WWW services. They can also be used for reporting tools and analytical processing. So, they are preferred in any places where up-to-date data is not required.
The idea is very simple - a write transaction installs an update with a timestamp in the past. This update can be retrieved by a read-only transaction using local HLC. The timestamp must be less than minimum current time among all nodes, but larger than a previous committed timestamp.
The only difference with a normal rw transaction is that we need to delay the commit until all replicas receive the update. This is required for a scenario where a replica is lagging and doesn't receive log entries. This causes either
The downside is that we need to resolve write intents until the write transaction is completed, which can take some time on big topologies. This can be mitigated by installing write intents in the future and ignoring them until a current timestamp (HLC.now()) is not within clock uncertainty, in which case we must wait it out. This optimization can be postponed until later.
The lock-free read-write transactions can be used to atomically apply metadata.
For example, let’s consider schema changes for a table. All coordinators must see applied changes atomically, but we don’t want to use normal locking - this will create enormous contention.
We must associate a schema version at the start of the transaction. This can be done on the first table operation, for each enlisted table. At the end of a transaction the initial schema version is validated against the current version. If they are compatible (how exactly is defined by application logic), the txn can be committed, otherwise it is aborted.
This guarantees that a transaction will use the same schema for all its operations and it will not be committed if a schema is no longer up-to-date and incompatible.
This section will cover up recovery for transactions. It describes how the txn protocol recovers from various kinds of failures: a txn coordinator failure, enlisted data node failure, majority loss for a partition, which is a particular case of data node loss.
First consider the simplest case: a data node failure. In this case a txn coordinator is alive, has full txn topology, and can commit or abort the transaction directly, so the transaction is never abandoned and makes progress. A single data node failure doesn’t prevent a transaction’s progress until a majority is preserved, except in the case of read lease expiration. Even if a whole partition was lost be we know that all writes are replicated, the transaction can be committed.
There is a special case for RW conflict. We assume read locks are neither persistent nor replicated, so its state is lost on primary replica failure. This makes it impossible to detect RW conflicts, so we need to avoid it somehow.
Consider a scenario when a read was made from a primary replica and it goes offline before a transaction commits. Additional measures are required to preserve read safety.
Because read locks are not replicated, it's possible that another transaction is mapped to a new primary replica and invalidates a read for the current not yet committed transaction.
Assume a transactions
T1: a=r1(x), w1(y), c1
T2: w2(x), c2
Due to PR1 failure and a lost read lock, a following history is possible.
H = r1(x) w2(x) c2 w1(y) c1
Assume the RO transaction T3 and the history
H2 = r1(x) w2(x) c2 r3(x) r3(y) w1(y) c1
The RO transaction will return the inconsistent result to a user, not corresponding to any serial execution of T1, T2, T3
This is illustrated by the diagram:
Note that H2 is not allowed by SS2PL, instead it allows
H3 = r1(x) w1(y) c1 w2(x) c2
We prevent this history by not committing TX1 if its commit timestamp lies outside lease ranges.
To achieve this, we need to periodically refresh the lease interval for the read primary replicas if a transaction runs too long.
In the following diagram the lease refresh discovers that the lease is no longer valid, and the transaction is aborted.
Leases can be refreshed periodically by TxnCoordinator, depending on the lease duration. Successful refreshing of a lease extends transaction deadline up to a lease upper bound.
The more frequent refreshing reduces the probability of txn abort due to reaching the deadline.
The reasonable refresh rate can be calculated as a fraction of the lease duration D, for example D/3.
Another issue arises due to absence of worst case time guarantees on the replication to majority. Consider the picture:
This picture shows the possible commit reordering between T1 and T2 due to delay in applying the commit for T1, despite the fact T2 has a larger commit timestamp. This leads to a situation when T2 becomes visible before T1. But it seems not to cause any consistency issues, because up-to-date data is correct and the possible issues can only be with RO transactions at a timestamp T2. Likely, we already have the intent written for T1 and the write intent resolution procedure will eventually wait for committed intent. So, for now I’m considering this reordering harmless.
This means a RW transaction becomes abandoned, if it’s not started to commit. All locks and write intents created by the transaction will stay forever, if no additional measures are taken.
The one way to resolve such txns state is to use discovery service to detect coordinator failure, search locally all txn ids initiated by the failed node, group them by the commit partition node and send TxnOutcomeRequest to each primary replica, containing a list of corresponding txIds to check. The commit partition group is the ultimate source of the information about txn finish state. There are several check outcomes.
...
The RO transaction will return the inconsistent result to a user, not corresponding to any serial execution of T1, T2, T3
This is illustrated by the diagram:
Note that H2 is not allowed by SS2PL, instead it allows
H3 = r1(x) w1(y) c1 w2(x) c2
We prevent this history by not committing TX1 if its commit timestamp lies outside lease ranges.
To achieve this, we need to periodically refresh the lease interval for the read primary replicas if a transaction runs too long.
In the following diagram the lease refresh discovers that the lease is no longer valid, and the transaction is aborted.
Leases can be refreshed periodically by TxnCoordinator, depending on the lease duration. Successful refreshing of a lease extends transaction deadline up to a lease upper bound.
The more frequent refreshing reduces the probability of txn abort due to reaching the deadline.
The reasonable refresh rate can be calculated as a fraction of the lease duration D, for example D/3.
Another issue arises due to absence of worst case time guarantees on the replication to majority. Consider the picture:
This picture shows the possible commit reordering between T1 and T2 due to delay in applying the commit for T1, despite the fact T2 has a larger commit timestamp. This leads to a situation when T2 becomes visible before T1. But it seems not to cause any consistency issues, because up-to-date data is correct and the possible issues can only be with RO transactions at a timestamp T2. Likely, we already have the intent written for T1 and the write intent resolution procedure will eventually wait for committed intent. So, for now I’m considering this reordering harmless.
This means a RW transaction becomes abandoned, if it’s not started to commit. All locks and write intents created by the transaction will stay forever, if no additional measures are taken.
The one way to resolve such txns state is to use discovery service to detect coordinator failure, search locally all txn ids initiated by the failed node, group them by the commit partition node and send TxnOutcomeRequest to each primary replica, containing a list of corresponding txIds to check. The commit partition group is the ultimate source of the information about txn finish state. There are several check outcomes.
RO transactions are handled differently. They rely on a write intent resolution to make progress. This process is described in detail in Reading the data. But it’s possible that both paths are impossible due to failed or unresponsive nodes. If by some reason a write intent resolution is not possible (or timed out), the RO txn must be failed with a corresponding exception to avoid the risk of reading inconsistent data.
On a restart a volatile node state is lost. No special handling is required for unresolved write intents on node restarts. If they are accessed in this state, the transaction will try to resolve its state via the commit partition (because local txn map doesn't contain the coordinator), as described in the RW algorithm section. If a commit partition doesn’t contain the txn state, this means a transaction is abandoned and must be aborted. A transaction is added to abandoned list and is processed as described in the Coordinator Failure section. As soon as aborted state is written, a current txn can proceed.
The edge case for this scenario is a full cluster restart under the load.
We have the cleanup phase as a part of the commit in the current protocol. For this purpose a list of enlisted partitions is passed together with a finish request from a txn coordinator to a commit partition group. A commit includes the part then a cleanup request is sent to all enlisted partitions to unlock locks and commit uncommitted versions.
But it’s possible that a commit partition group primary replica fails in the middle of a process.
To avoid the situation, we must ensure that until a majority is preserved, all enlisted partitions eventually will receive a message.
We can use a simple protocol backed by consensus to achieve this, previously called a durable cleanup. We use two raft commands for this purpose:
FinishCommand (TxState txtate, Map<PartId,Boolean> finishState);
where finishState holds the true if a corresponding partition group acked the unlocking, false - otherwise (default value)
As soon as FinishCommand is committed to a group, a background process periodically sends CleanupRequest to enlisted partitions. Each cleanup request is processed by a partition group and acks the corresponding map entry by sending CleanupAckRequest. If a primary replica fails - this is not a problem, because CleanupRequests periodically are being resent. As soon as all partitions are acked, the background process stops to send unlock requests. Unlock requests are idempotent, so sending many requests shouldn’t be a problem.
This is a reason for unavailability. Some transactions can’t be finished until the majority is restored.
Below is the list of components that are required for CC execution. Each component contains a minimal set of required methods.
Implements methods for accessing multi-version data under txn context. One per data table partition. Contains all primary and secondary indexes.
Code Block | ||
---|---|---|
| ||
// Puts a row. This call updates primary and all secondary indexes after acquiring necessary locks.
CompletableFuture<Void> put(Tuple row, UUID txId);
// Removes a row. This call updates primary and all secondary indexes after acquiring necessary locks.
CompletableFuture<Tuple> remove(Tuple keyTuple, UUID txId);
// Executes the query in RW mode.
AsyncCursor<Tuple> query(@Nullable Query query, UUID txId);
// Executes the query in RO mode.
Cursor<Tuple> query(@Nullable Query query, Timestamp readTs);
// Invokes a closure over the set of records matching the filter. Returns a number of modified rows.
CompletableFuture<Long> invoke(InvokeClosure closure, @Nullable Filter filter, UUID txId);
// Commits a transaction with a timestamp.
CompletableFuture<Boolean> commit(UUID txId, Timestamp commitTs);
// Aborts a transaction
CompletableFuture<Boolean> abort(UUID txId); |
Query implements the data retrieval logic and can use indexes. Possible types are:
The returned cursor must support async execution, because scans can acquire locks in RW mode.
Not all indexes can support all filter types. TBD: define new index types, like BloomFilter, etc.
The proposed operations should support batches.
Used to manage locks (easily guessed). It has a volatile state, which is lost on restart. One per table. Used by MVStore to acquire/release locks as required by CC protocol.
LockManager provides the following operations:
Holds a transaction state. Currently two states are supported: COMMITTED and ABORTED. One per data node. Used by MVStore to resolve write intents and for txn recovery purposes.
TxnStateManager provides the following operations:
This state is replicated using the ReplicationManager for a partition, calculated by a transaction id. Uses a separate TxnStateStore for holding txn state.
Coordinates a transaction execution. Can be deployed on any cluster node, not necessary data node. One per node. Holds a map of active transactions UUID→InternalTransaction, assuming a txId is implemented using UUID class.
Code Block | ||
---|---|---|
| ||
// Assign RW transaction id.
CompletableFuture<UUID> beginAsync();
// Commit the transaction
CompletableFuture<Boolean> commitAsync(UUID txId);
// Rollback the transaction
CompletableFuture<Boolean> rollbackAsync(UUID txId); |
There is no need to explicitly start the RO transaction. A user must pass a timestamp instead of txId to execute a RO transaction.
Internal facade for transactional record storage. Can be deployed on any cluster node, not necessary data node. Caches the information about table partitions topology and last known table schema. One per node. Executes transactional ops by sending the requests to enlisted partition data nodes, defined by the calculated data affinity.
The set of methods is similar to MVStore. Additionally, snapshot scans are allowed to be executed on an exact node by passing a custom partition map:
Code Block | ||
---|---|---|
| ||
Cursor<Tuple> scan(Tuple keyLower, boolean lowerInclusive, Tuple keyUpper, boolean upperInclusive,
@Nullable Timestamp ts, @Nullable Map<PartitionId, ClusterNode> partMap);
// Other methods |
Snapshot scan is executed on backups out of the box and, if the parMap is not null, uses this map to define the exact partition to node mapping.
Provides replication capabilities to MVStore. The actual implementation can use any replication protocol, for example, RAFT. One per replicated partition. Delegates to MVStore to execute “the apply” phase of a replication.
Code Block | ||
---|---|---|
| ||
CompletableFuture<Peer> getOrWaitLeader();
@Nullable Peer leader();
isLocal(Peer peer);
List<Peer> peers();
List<Peer> learners();
<T> CompletableFuture<T> apply(Command command);
boolean isLeaderLeaseValid(HLC ts); |
Extends the MVStore functionality with replication support and leases validity for reads. Uses ReplicationManager for execution of replication related logic
...
RO transactions are handled differently. They rely on a write intent resolution to make progress. This process is described in detail in Reading the data. But it’s possible that both paths are impossible due to failed or unresponsive nodes. If by some reason a write intent resolution is not possible (or timed out), the RO txn must be failed with a corresponding exception to avoid the risk of reading inconsistent data.
On a restart a volatile node state is lost. No special handling is required for unresolved write intents on node restarts. If they are accessed in this state, the transaction will try to resolve its state via the commit partition (because local txn map doesn't contain the coordinator), as described in the RW algorithm section. If a commit partition doesn’t contain the txn state, this means a transaction is abandoned and must be aborted. A transaction is added to abandoned list and is processed as described in the Coordinator Failure section. As soon as aborted state is written, a current txn can proceed.
The edge case for this scenario is a full cluster restart under the load.
We have the cleanup phase as a part of the commit in the current protocol. For this purpose a list of enlisted partitions is passed together with a finish request from a txn coordinator to a commit partition group. A commit includes the part then a cleanup request is sent to all enlisted partitions to unlock locks and commit uncommitted versions.
But it’s possible that a commit partition group primary replica fails in the middle of a process.
To avoid the situation, we must ensure that until a majority is preserved, all enlisted partitions eventually will receive a message.
We can use a simple protocol backed by consensus to achieve this, previously called a durable cleanup. We use two raft commands for this purpose:
FinishCommand (TxState txtate, Map<PartId,Boolean> finishState);
where finishState holds the true if a corresponding partition group acked the unlocking, false - otherwise (default value)
As soon as FinishCommand is committed to a group, a background process periodically sends CleanupRequest to enlisted partitions. Each cleanup request is processed by a partition group and acks the corresponding map entry by sending CleanupAckRequest. If a primary replica fails - this is not a problem, because CleanupRequests periodically are being resent. As soon as all partitions are acked, the background process stops to send unlock requests. Unlock requests are idempotent, so sending many requests shouldn’t be a problem.
This is a reason for unavailability. Some transactions can’t be finished until the majority is restored.
The requirements for correct snapshot is:
...