...
One of the major features of AI3Apache Ignite 3, as a distributed database, is the ability to execute multiple table operations as a single atomic operation, known as transaction. We need to design modern and robust distributed transaction protocol, taking into account current best practices. Compared to AI2previous release, we aim to support transactional SQL from the beginning and remove limitations like limited size of a transaction.
...
HTAP - hybrid transactional/analytical processingSI - snapshot isolation
To define key points of the protocol design, let's take a look at some features, which can be provided by the product, and value them from 1 to 3, where 3 means maximum importance for product success.
...
Let's evaluate each feature:.
Here we take into account the isolation property of a transaction. The strongest isolation is known to beSerializable, implying all transactions pretend to execute sequentially. This is very convenient to a user, because it prevents hidden data corruptionshttps://pmg.csail.mit.edu/papers/adya-phd.pdfand security issueshttp://www.bailis. org/papers/acidrain-sigmod2017.pdf. The price for this may be reduced throughput/latency due to increased overhead from CC protocol. Additional option is to allow a user to choose a weaker isolation level, likeSNAPSHOT. The ultimate goal is to implement Serializability without sacrificing performance too much, having Serializable as default isolation level.
...
Looking at the evaluation, it's easy to notice that our freshly-baked protocol design is of “one size fits all” types. It is an intentional choice, because AI is intended to be a general use case database designed for commodity hardware and work “fine” out of the box in common cases. Of course, there are cases where specialized solutions would work better. Additional optimizations and tuning capabilities can be introduced later.
...
The first requirement disables deterministic protocols like TBD refCalvin, because they need to know the txn transaction read-write set in advance (or require the expensive reconnaissance step).
...
The system also has to be horizontally scalable. To achieve scalability, the data will be partitioned using a hash or range partitioning method. The exact partitioning method is not important for the purpose of this document. We treat a partition here as a synonym for a shard. Each partition is assigned to a cluster node and replicated to a predefined number of replicas to achieve high availability. Adding more nodes increases a number of partitions in the cluster (or reduces a number of partitions per node in case of static partitioning), thus increasing the scalability.
A transaction can span multiple partitions, making it distributed. Each partition is additionally replicated in multiple copies for durability. Providing atomicity on a commit is an additional difficulty in a distributed environment. Typically this is achieved by using two-phase commit protocol or its improved consensus based versionhttps://www.microsoft.com/en-us/research/uploads/prod/2004/01/twophase-revised.pdf.
The performance is of not much importance in the proposed design. No goal is set to make the fastest database in the world. We just need the acceptable level of performance and the best great level of usability. We can optimize later, following the step-by-step improvement of the product.
Turns out we want aGoogle Spanner clone. It seems it was designed keeping the similar goals in mind. Other known clones are Cockroachdb, Yugabyte.
Transaction protocol describes the interaction of nodes forming a transactional table store - an (indexed) distributed records store supporting operations with ACID guarantees.
...
The transaction can be explicitly started by the client using the IgniteTransactions facade:
Transaction begin(); // Synchronous version CompletableFuture<Transaction> beginAsync(); Transaction#commit(); CompletableFuture<Void> Transaction#commitAsync(); Transaction#rollback(); CompletableFuture<Void> Transaction#rollbackAsync(); |
Or it can be implicitly managed by using a closure-style API:
void runInTransaction(Consumer<Transaction> clo); <T> T runInTransaction(Function<Transaction, T> clo); |
In a latter case, a transaction is automatically started and supplied to the user closure and will be automatically committed if no exceptions have been thrown during the execution of a transaction.
To enlist the operation into a transaction, the Transaction instance must be passed to corresponding transaction’s store methods. Each method accepts a transaction argument. Consider for example a method for updating a tuple:
void upsert(@Nullable Transaction txn, Tuple tuple); CompletableFuture<Void> upsertAsync(@Nullable Transaction txn, Tuple tuple); |
Multiple operations accepting a same transaction instance are executed atomically within a passed transaction.
...
The 2PL lock matrix looks like this:
Lock requested by T2 | Lock held by T1 | |
S | X | |
S | + | - |
X | - | - |
This protocol produces strict conflict serializable schedules, but allows cascading aborts, which complicates a recovery procedure.
...
The basic data structure for MVCC is a version chain. It maintains a list of versions for a record. Various representations of a version chain are possible, as described in the paper TBD. It states that the N2O scheme performs well, so we will take this approach. A single chain entry may look like this:
tuple | beginTs | endTs | txId | nextPtr |
val:100 | 1000 | INF | NULL | NULL |
...
Here is the example of a version chain (from newest to oldest) with uncommitted (at the head) and committed versions:
tuple | beginTs | endTs | txId | nextPtr |
val:100 | NULL | NULL | 12345 | Ref to next row |
val:200 | 1000 | INF | NULL | NULL |
Here is the example of a version chain (from newest to oldest) with two committed versions:
tuple | beginTs | endTs | txId | nextPtr |
val:100 | 2000 | INF | NULL | Ref to next row |
val:200 | 1000 | 2000 | NULL | NULL |
We call the most up-to-date record the version chain head.
...
This creates the committed version:
addr | value | beginTs | endTs | txId | nextPtr |
X | ( “test”, 10) | 1000 | INF | NULL | NULL |
The secondary index at this point contains one entry:
...
The version chain for this transition will look like this:
addr | value | beginTs | endTs | txId | nextPtr |
X | ( “test”, 11) | 2000 | INF | NULL | Y |
Y | ( “test”, 10) | 1000 | 2000 | NULL | NULL |
The secondary index at this point contains two entries, making it multiversion:
...
To execute the write intent resolution part of CC protocol we need to add the commit partition id field to the version chain. It matters only for uncommitted versions:
tuple | beginTs | endTs | txId | commitPartId | nextPtr |
val:100 | 1000 | INF | NULL | NULL |
...
Basic 2PL types of locks are: S_lock and X_lock.
Lock requested by T2 | Lock held by T1 | |
S | X | |
S | + | - |
X | - | - |
The lock compatibility matrix for {S,X} locks
...
A lock upgrade occurs if a transaction T already holds a d-duration m-mode lock on a data item x and later requests a d’-duration m’-lock on x, where either m < m’ or no exclusivity order is defined between m and m’. If no other transaction holds on x a lock that is incompatible with m’, T is granted on x a d’-duration lock of mode sup{m,m’}.
Lock requested by T | Lock held by T | |
S | X | |
S | S | X |
X | X | X |
The lock upgrade matrix for {S,X} locks
...
Mode | IS | IX | S | SIX | X |
IS | Yes | Yes | Yes | Yes | No |
IX | Yes | Yes | No | No | No |
S | Yes | No | Yes | No | No |
SIX | Yes | No | No | No | No |
X | No | No | No | No | No |
We will use two granules: tables (high level granules) and records (low level granules).
...