Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

IDIEP-91
Author
Sponsor
Created

 

Status

Status
colourGreen
titleDRAFTACTIVE


Table of Contents

If I have seen further, it is by standing on ye shoulders of Giants

...

In this section, I'll give some definitions encountered through the text. It can be used to find a definition of a specific term quickly.

Record (aka Row, Tuple, Relation) -a collection of attribute-value pairs.

...

Consistency - a property that moves a database from one consistent state to another after the finish. Meaning The meaning of the consistent state is defined by a user.

...

Interactive transaction - a transaction whose operation set is not known a priori. Can It can be aborted at any time if not committed yet.

...

SG - serializability graph

RO - the abbreviation for "read-only"

RW - the abbreviation for "read-write"

TO - the abbreviation for "timestamp ordering"

Design Goals

To define key points of the protocol design, let's take a look at some features, which c n can be provided by the product, and value them from 1 to 3, where 3 means maximum importance for product success.

...

Let's evaluate each feature:

...

Strong isolation

Here we take into account the isolation property of a transaction. The strongest isolation is known to beSerializable, implying all transactions pretend to execute sequentially. This is very convenient forusersr for users because it prevents hidden data corruption and security issues. The This price for this may be reduced throughput/latency due to increased overhead from CC protocol. An additional option is to allow a user to choose a weaker isolation level, likeSNAPSHOT. The ultimate goal is to implement Serializability without sacrificing performance too much, having Serializable as the default isolation level. 

...

Such transactions can be used to build analytical reports, which can take minutes without affecting (and being affected by) concurrent OLTP load. Any SQL select for read query is naturally mapped to this type of aransaction. These transactions can run for several minutes.  transaction. Such transactions can also read snapshot data in the past , at some timestamp. This is also known as HTAP.

Score: 3

Consistent replica reads

Very useful A handy feature for load balancing, especially in conjunction with the previous. 

Score: 3

Unlimited or very

...

huge action size

Some databases limit the number and total size of records enlisted in a transaction, because they buffer temporary uncommitted read or written records.. This is not convenient for a user. 

...

Score: 1

Data loss toleration

It's important to know essential how many node failures we can tolerate until declaring the unavailability due to temporary data loss (or full unavailability, in case of in-memory deployment). This is known as k-safety. More is better. 

Score: 2

High-level observations

Looking at the evaluation, it's easy to notice that our protocol design is of “one size fits all” types. It is an intentional choice , because Apache Ignite is intended to be a general use case database designed for commodity hardware and work “fine” out of the box in common typical cases. Of course, there are cases where specialized solutions would work better. Additional optimizations and tuning capabilities can be introduced later.

Let’s define key the key  points of a design. It’s necessary to have:

  1. Interactive transactions 
  2. Read-only (long-running ) queries, which are able to can execute on replicas).
  3. Serializable isolation
  4. Unlimited (or very largehuge) transaction size

The first requirement disables deterministic protocols like Calvin, because  because they need to know the transaction read-write set in advance (or require the expensive reconnaissance step). 

...

The third requirement implies a CC protocol which that allows for serialized schedules.

...

The system also has to be horizontally scalable. To achieve scalability, the data will be partitioned using a hash or range partitioning method.  The exact partitioning method is not important essential for the purpose of this document. We treat a partition here as a synonym for a shard. Each partition is assigned to a cluster node and replicated to a predefined number of replicas to achieve high availability. Adding more nodes increases a the number of partitions in the cluster (or reduces a the number of partitions per node in case of static partitioning), thus increasing the scalability.

...

The performance aspect is not a central goal in the proposed design. We just need the a "good" level of performance at the beginning and the a great level of usability. We can optimize later, following the step-by-step improvement of the product.

Turns It turns out we want aGoogle Spanner clone. It seems it was designed keeping the similar goals in mind. Other notable Spanner clones are Cockroachdb,  and Yugabyte.

Table store

Transaction protocol describes the interaction of nodes forming a transactional table store - an (indexed) distributed records store supporting operations with ACID guarantees.

...

  • A transaction is started
  • Some operations are enlisted into the transaction.
  • A transaction is committed or aborted by the user, or it is aborted by the system because of serialization conflict or lease expiration and must be retried

Native API

The API entry point is the IgniteTransactions facade:

Code Block
languagejava
titleIgniteTransactions
public interface IgniteTransactions {
	Transaction begin();
	CompletableFuture<Transaction> beginAsync();

	void commit();
	CompletableFuture<Void> commitAsync();

    void rollback();
	CompletableFuture<Void> rollbackAsync();  

    /**
     * Executes a closure within a transaction.
     *
     * <p>If the closure is executed normally (no exceptions) the transaction is automatically committed.
     * <p> In case of serialization conflict (or other retriable issue), the transaction will be automatically retried, so the closure must be a "pure function".

     * @param clo The closure.
     *
     * @throws TransactionException If a transaction can't be finished successfully.
     */     
     void runInTransaction(Consumer<Transaction> clo);

    <T> T runInTransaction(Function<Transaction, T> clo); 
}
    <T> CompletableFuture<T> runInTransactionAsync(Function<Transaction, CompletableFuture<T>> clo) 
}


Code Block
languagejava
titleTransaction
public interface Transaction {
    /**
     * Synchronously commits a transaction.
     *
     * @throws TransactionException If a transaction can't be committed.
     */
    void commit() throws TransactionException;

    /**
     * Asynchronously commits a transaction.
     *
     * @return The future.
     */
    CompletableFuture<Void> commitAsync();

    /**
     * Synchronously rolls back a transaction.
     *
     * @throws TransactionException If a transaction can't be rolled back.
     */
    void rollback() throws TransactionException;

    /**
     * Asynchronously rolls back a transaction.
     *
     * @return The future.
     */
    CompletableFuture<Void> rollbackAsync();
}

To enlist the operation To enlist the operation into a transaction, the Transaction instance must be passed to the corresponding transaction’s store methods. Each method accepts a transaction argument. Consider, for example, a method for updating a tuple:

Code Block
languagejava
titleTupleStore
public interface TupleStore {
    /**
     * Inserts a record into the table if it doesn't exist or replaces the existing one.
     *
     * @param txn   The transaction or {@code null} to auto commit.
     * @param tuple A record to insert into the table. The record cannot be {@code null}.
     */  
    void upsert(@Nullable Transaction txn, Tuple tuple);

	CompletableFuture<Void> upsertAsync(@Nullable Transaction txn, Tuple tuple);

 	/** Other methods here */  
}`1`1


Multiple operations accepting a the same transaction instance are executed atomically within a passed transaction.

...

Defines how the table data (including indexes) is split between data nodes. Correct A correct partitioning scheme is essential for scalable cluster workloads. TBD partitioning IEP ref, add diagrams

Partition key

Partition A partition (or affinity) key is a set of attributes used to compute a partition. For example, a prefix set of a primary key can be used to calculate a partition. A table partition key matches its primary index partition key. A secondary index is not required to have a partition key - such indexes are implicitly partitioned (as table rows).

...

Data is assigned to nodes using some kind of a hash function, calculated over a set of attributes. There are several approaches to hash partitioning with different trade-offs offs (consistent vs. rendezvous).

Range partitioning

Key space is split into range partitions (ranges) , according to predefined rules (statically) or dynamically. Each range is mapped to a data node and represents a sorted map. The range search tree is used to locate a key for a node. Order by for such a table is just traversing a search tree from left to right and concatenating each range.

...

Some data is considered co-located , if they have the same partition key. For example, two tables are co-located if they have the same partition key for primary indexes.

Note what that table and index can reside on the same data node, but they can not be co-located.

If the index has no explicit partition key, its data is partitioned implicitly, the same as the PK partition key.

Additional information can be found here: IEP-86: Colocation Key

Sub-partitioning

The data can be additionally locally partitioned (within a partition) to improve query performance and parallelism.

...

We will rely on a replication protocol to consistently store data on multiple replicas consistently. Essentially, a replication protocol provides transactional semantics for a single partition update. We will build a cross partition transactional protocol on top of a replication protocol, as suggested in IEP-61.

The actual protocol type is not so important , because we don’t want to depend on specific protocol features - this breaks an abstraction. 

...

  • Majority-based protocols
    • This class of protocols requires the majority of nodes to respond in order it to commit a write, making it naturally tolerant to failures provided that a majority is responsive
  • Membership-based protocols
    • Protocols in this class require all operational nodes in the replica group to acknowledge each write. Membership-based protocols are supported by a reliable membership (RM), based on majority replication.

...

CC is responsible for controlling the schedules of RO and RW transactions to be serializable. It defines the types of schedules allowed by concurrent transactions and is a key point of a protocol.

...

Example 1. Assume we have 3 three transactions:

T1 = r1[x] r1[z] w1[x], T2 = r2[y] r2[z] w2[y], T3 = w3[x] r3[y] w3[z]

...

We assume each transaction is committed. What can we tell about the serializability of S1 and S2 ? Recall the serializable schedule definition: to be serializable, ; it must be equivalent to some serial execution order of transactions T1, T2, and T3.

Two actions on the same data object, modified by different transactions, conflict, if at least one of them is a write. The three anomalous situations can be described in terms of when the actions of two transactions T1 and T2 ,conflict with each other: in a write-read (WR) conflict T2 reads a data object previously written by T1; we define read-write (RW) and write-write (WW) conflicts similarly. These conflicts can cause anomalies like dirty reads, unrepeatable reads, lost updates, and others.

The S1 is obviously serial: it corresponds to the execution sequence: T3, T2, T1. It's not that obvious for S2 if it's serializable or not. To prove it, we should find an equivalent serializable schedule. We can attempt to swap a non-conflicting operation (preserving the order of conflicting) until the equivalent schedule is produced.

...

Schedules are called conflict equivalent if they can be converted one into another by swapping non-conflicting operations, which have no effect on do not affect execution outcome. This also means they have the same order of conflicting operations. A schedule is called conflict serializable if it’s conflict equivalent to a serial schedule. Every conflict serializable schedule is serializable, but not vice versa. This class of schedules is defined as CSR.

...

  • A node for each committed transaction in S.
  • An arc from Ti to Tj if an action of Ti precedes and conflicts with one of Tj's actions.

A schedule S is a conflict serializable if and only if its precedence graph is acyclic. An equivalent serial schedule, in this case is given by anytopological sort over the precedence graph.

...

  • If a transaction T wants to read (respectively, modify) an object, it first requests a shared (respectively, exclusive) lock on the object - same to S2PL2PL.
  • Write locks held by a transaction are released when the transaction is completed.

...

  • If a transaction T wants to read (respectively, modify) an object, it first requests a shared (respectively, exclusive) lock on the object - same to S2PL2PL.
  • All locks (read + write) held by a transaction are released when the transaction is completed.

...

Execute a transaction's validation and write phase together as a critical section: while ti being in the val-write phase, no other tk can enter its val-write phase

...

An RO transaction is associated with a read timestamp at the beginning. It scans indexes to get access to table data via indexed rowIds. Because it passes normal locking, rowIds must be somehow filtered according to their visibility. This filtering process is known as a write intent resolution and is crucial for RO transactions processing. Write intent resolution is required then when a version chain corresponding to rowId has an uncommitted version.

...

  • If there is unresolved write intent (uncommitted version in the chain head), it initiates the intent resolution 
    • Checks a local txn state map for committed or aborted state - allow read if the state is committed and commitTs <= readTs.
    • If not possible, send a TxStateReq  to the txn coordinator - this initiates the coordinator path. Coordinator address is fetched from the txn state map.
    • If a coordinator path was not able to resolve the intent, one of the following has happened - the coordinator is dead or txn state is not available in the cache. Calculate a commit partition and send the TxStateReq to its primary replica - this initiates the commit partition path.
    • Retry commit partition path until a success or timeout.

...

  • If the local txn is finished, return the response with the outcome: commit or abort. The txn state is stored in a local cache, mentioned in the RW section.

    If the local txn is finishing (waiting for finish state replication), wait for outcome and return response with the outcomeresult: commit or abort

    If the outcome result is to commit, additional timestamp check is required: a commit timestamp must be <= readTs. If the condition is not held, the outcome result is changed to commit abort.

    If local txn is active, adjust the txn coordinator node HLC according to readTs to make sure the txn commit timestamp is above the read timestamp. The read timestamp must be installed before txn is started to commit, so commit timestamp is assigned after the read timestamp. This must be achieved by some sort of concurrency control, preferably non-blocking. In this case we must ignore the write intent, so the outcome is to abort.

    If txn state is not found in a local cache and txn is not active, return NULL.

...

Because read locks are not replicated, it's possible that another transaction is mapped to a new primary replica and invalidates a read for the current not yet committed transaction.

Assume a transactions

T1: a= r1(x), w1(y), c1

T2: w2r2(x), c2

Due to PR1 failure and a lost read lock, a following history is possible.

H = r1(x) y), w2(x) c2 w1(y) c1

Assume the RO transaction T3 and the history

H2 = r1(x) w2(x) c2 r3(x) r3(y) w1(y) c1

The RO transaction will return the inconsistent result to a user, not corresponding to any serial execution of T1, T2, T3

This is illustrated by the diagram:

Image Removed

Note that H2 is not allowed by SS2PL, instead it allows

, c2

Due to PR1 failure and a lost read lock, a following history is possible.

H H3 = r1(x) w1r2(y) c1 w2(x) c2 w1(y) c1

This history is not serializable.

We prevent this history by not committing TX1 if its commit timestamp lies outside lease ranges.

...

In the following diagram the lease refresh discovers that the lease is no longer valid, and the transaction is aborted.

Leases can must be refreshed periodically by TxnCoordinator, depending on the lease duration. Successful refreshing of a lease extends transaction deadline up to a lease upper bound.

The more frequent refreshing reduces the probability of txn abort due to reaching the deadline.The reasonable refresh rate can be calculated as a fraction of the lease duration D, for example  D/3is D/2, where D is a lease duration interval.

Commit reordering for dependent transactions

...

This picture shows the possible commit reordering between T1 and T2 due to delay in applying the commit for T1, despite the fact T2 has a larger commit timestamp. This leads to a situation when T2 becomes visible before T1. But it seems not to cause any consistency issues, because up-to-date data is correct and the possible issues can only be with RO transactions at a timestamp T2. Likely Luckily, we already have the intent written for T1 and the write intent resolution procedure will eventually wait for committed intent. So, for now I’m considering this reordering harmless.

...

Below is the list of components that are required for CC execution. Each component contains a minimal set of required methods. This section may serve as a hint for implementors.

MVStore

Implements methods for accessing multi-version data under txn context. One per data table partition. Contains all primary and secondary indexes.

Code Block
languagejava
// PutsInserts a row and adds it to the indexes. This call updates primary and all secondary indexes after acquiring necessary locks.
CompletableFuture<Void> put(Tuple row Returns a rowId for the inserted row.
CompletableFuture<RowId> put(Tuple row, UUID txId); 

// Updates a row and related index entries. Returns a previous value.
CompletableFuture<Tuple> update(RowId rowId, Tuple newVal, UUID txId); 

// Removes a row. This call updates primary and allrelated secondaryindex indexesentries. afterReturns acquiringa necessaryremoved locksrow.
CompletableFuture<Tuple> remove(TupleRowId keyTuplerowId, UUID txId);

// ExecutesGet thea querytuple inby RW moderowId.
AsyncCursor<Tuple>CompletableFuture<Tuple> queryget(@NullableRowId Query queryrowId, UUID txId);

// Executes the query in RORW mode.
Cursor<Tuple>AsyncCursor<RowId> query(@Nullable Query query, TimestampUUID readTstxId);

// Invokes a closure over Executes the setquery ofin records matching the filter. Returns a number of modified rows.
CompletableFuture<Long> invoke(InvokeClosure closure, @Nullable Filter filter, UUID txIdRO mode.
Cursor<Tuple> query(@Nullable Query query, Timestamp readTs);

// Commits a transaction with a timestamp.
CompletableFuture<Boolean> commit(UUID txId, Timestamp commitTs);

// Aborts a transaction
CompletableFuture<Boolean> abort(UUID txId);

...

The proposed operations should support batches.

...

LockTable

Used to manage locks (easily guessed). It has a volatile state, which is lost on restart. One per table/index. Used by MVStore to acquire/release locks as required by CC protocol.

LockManager LockTable provides the following operations:

  • Lock getOrAddEntry(Object key) // Allocates a new lock entry for a given key
  • void remove
  • CompletableFutute<LockDesc> acquire(UUID txId, byte[] lockName, LockMode mode); // Acquires (or upgrades) a lock and returns a future with a lock description
  • void release(Lock lock); // Release previously acquired lockAttempts to remove lock resources

Lock provides the following operations:

  • Locker acquire(UUID lockerId, LockMode mode) // Lock or upgrade existing lock in/to a mode
  • LockMode downgrade(UUID lockerIdvoid downgrade(Lock lock, LockMode mode) ; // Downgrades Downgrade a lock to a mode
  • void releaseIterator<Lock> locks(UUID txIdid); // Return all locks acquired by a transaction.Release the acquired lock

TxnStateManager

Holds a transaction state. Currently two states are supported: COMMITTED and ABORTED. One per data node. Used by MVStore to resolve write intents and for txn recovery purposes.

...

We can reason more precisely on the performance as soon as some benchmark results are ready. Next step, like implementing other kinds of CC, can be decided after analyzing their results, like implementing other kinds of CC.

Discussion Links

https:// Links to discussions on thedevlist, if applicable.

Reference Links

TBD Produce 

Tickets

lists.apache.org/thread/50g0n8tkjwr0s0vq66cd9h46z1lqj1gy

Reference Links

https://github.com/ascherbakoff/ai3-txn-mvp MVP for RW based CC, with data and index versioning support.

Tickets

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyIGNITE-15081
// Links or report with relevant JIRA tickets.