Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: add Status section

Audience: All Cassandra Users and Developers
User Impact: Support for fast general purpose transactions
Whitepaper:

...

Accord

...

GitHub:

...

https://github.com/

...

apache/cassandra-accord

Motivation

...

Status

Current state: Accepted

Discussion thread: https://lists.apache.org/thread/xgj4sym0d3vox3dzg8xc8dnx4c8jb4d5 , https://lists.apache.org/thread/j402lzzf7m699zc2vk23vgfxz8wwtlyl

JIRA

Jira
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyCASSANDRA-17092

Motivation

Users must expend significant effort to modify their database consistently while maintaining scalability. Even simple transactions involving more than one partition may become complex and error prone, as a distributed state machine must be built atop the database. Conversely, packing all of the state into one partition is not scalable.

Performance also remains an issue, despite recent Paxos improvements: latency is still twice its theoretical minimum over the wide area network, and suffers particularly badly under contention.

This work aims to improve Cassandra to support fast general purpose transactions. That is, those that may operate over any set of keys in the database atomically, modifying their contents at-once, with any action conditional on the existing contents of any key.

Goals

  • General purpose transactions (may operate over any keys in the database at once)
  • Strict-serializable isolation
  • Optimal latency: one wide area round-trip for all transactions under normal conditions
  • Optimal failure tolerance: latency and performance should be unaffected by any minority of replica failures
  • Scalability: there should be no bottleneck introduced
  • Should have no intrinsic limit to transaction complexity
  • Must work on commodity hardware
  • Must support live migration from Paxos

User Impact

Batches (including unconditional batches) on transactional tables will receive ACID properties, and grammatically correct conditional batch operations that would be rejected for operating over multiple CQL partitions will now be supported, e.g.

Code Block
languagesql
BEGIN BATCH
UPDATE tbl1 SET value1 = newValue1 WHERE partitionKey = k1
UPDATE tbl2 SET value2 = newValue2 WHERE partitionKey = k2 AND conditionValue = someCondition
APPLY BATCH

Long Term

This work is expected to replace Paxos for the project. In order to support live migration both protocols will be maintained for at least an interim period, after which it is expected that Paxos will be deprecated. The precise timing of this will be up to the community.

While not an initial goal, supporting transactional global secondary indexes and supporting non-global modes of operation (i.e. to replace LOCAL_SERIAL for latency sensitive users) are future goals we anticipate enabling with this work.

This work shall be developed in a modular manner, to allow for coexistence with other consensus protocols or transaction managers. This will allow us to evolve Accord without precluding alternative solutions, as future work expands Cassandra's transactional capabilities beyond the goals of this CEP. Initially, supporting the Paxos-based LWT and Accord side by side is also an example of such modularity and optionality.

Non-Goals

  • UX improvements to exploit this facility are considered out of scope, and will be addressed in separate CEP
  • SASI, SAI and local secondary indexes

Goals

...

Prior Work

Existing databases solve this problem by using either a global leader (FaunaDB, FoundationDB) or by a complex combination of multiple leaders including a transaction log and per-key leaders (DynamoDB, CockroachDB, YugaByte). The academic literature has also outlined leaderless approaches that have not yet been utilised.

Global Leader
This category of approach is simple and correct but introduces a scalability bottleneck that would be irreconcilable with the size of many Cassandra clusters.

Multiple Leaders
DynamoDB, CockroachDB and YugaByte utilise variants of this approach. It appears to be a very complex setup. The latency that can be achieved is unclear, but is unlikely to be better than two round-trips in the general case, since leaders are not guaranteed to be co-located (and nor are clients with leaders). Importantly these approaches appear to require either specialised hardware clocks or provide only serializable isolation, and may well have throughput restrictions to provide any expectation of strict-serializable isolation even without a strong guarantee of maintaining it. They also suffer from aborts. They do offer optimal failure tolerance and are scalable.

Janus
Strict-serializable isolation may be easily extended to cross-shard transactions using EPaxos-derived approaches, as was first demonstrated by Janus. In its original formulation Janus is fundamentally a more limited variant of EPaxos that was expanded to the cross-shard case. It has terrible failure properties, requiring every replica to agree to take the fast path. Fortunately, all leaderless protocols based on the intuitions of EPaxos can likely be modified to support cross-shard transactions, so we may assess all such protocols.

EPaxos
While Janus has poor failure properties, the EPaxos protocol can be extended in a similar fashion while achieving almost the same failure properties - we must only increase the quorum size by one, or else utilise a proxy coordinator for each shard. EPaxos is the first leaderless protocol to demonstrate a fast path, however it suffers from high contention as every fast path participant must have witnessed every dependency in order for the fast path to be agreed.

Strict-serializable isolation is not a property of the basic EPaxos protocol, however it is possible to support it by responding to clients only once a transaction’s dependencies are committed. This is an acceptable but unnecessary limitation, as we will later demonstrate.

Latency of EPaxos is good if there are no conflicts, but like other leaderless protocols its failure properties are poor: the fast path will be disabled once a quarter of replicas are unreachable, so that the system will incur significantly higher load and slower transactions at this point, creating a risk of cascading failure.

Caesar
This protocol builds on the intuitions of EPaxos but instead of unanimously agreeing dependencies, it instead unanimously agrees an execution timestamp. This confers a lower conflict rate, as each voting replica may witness a different history - we only require that some super-majority sees the transaction’s timestamp before any higher timestamp. The use of a timestamp confers additional benefits we will outline below.

Caesar builds a set of dependent transactions that are equivalent to those of EPaxos in the aftermath of agreeing an execution timestamp, where the dependencies are consistent with the timestamp execution order but permit commutative operations (mostly reads with respect to other reads) to not interfere with each other, so that they may be performed concurrently.

However, Caesar has downsides: transactions may livelock indefinitely (though this is implausible, in practice this may lead to latency spikes), and its worst case latency is three wide area round-trips. It has the same suboptimal failure properties as other leaderless protocols.

Tempo
Tempo improves upon Caesar for latency, guaranteeing two-round agreement without caveat. Unfortunately it suffers from a read scalability bottleneck due to its inability to exploit transaction commutativity, and has particularly poor failure properties.

...

Execution
The union of all dependencies received during consensus is derived before t is disseminated via Commit and simultaneously a Read is issued by C to a member of each participating shard (preferably in the same DC), with those dependencies known to participate in that shard attached. This replica waits for all dependencies to be committed before filtering out those that are assigned a later t. The remaining dependencies are waited on until they execute and their result applied on this replica, before the read is evaluated and returned to the coordinator. C combines these responses to compute an update and client response, which is then disseminated by Apply to all replicas and returned to the client (respectively).

Code Block

Execution
Replica R receiving Commit(X, deps):
    Committed[X] = true
Coordinator C: 
    send a read to one or more (preferably local) replicas of each shard 
        (containing those deps that apply on the shard)
Replica R receiving Read(X, t, deps): 
    Wait for deps to be committed
    Wait for deps with a lower t to be applied locally
    Reply with result of read
Coordinator C (with a response from each shard):
    result = execute(read responses)
    send Apply(result) to all replicas of each shard
    send result to client
Replica R receiving Apply(X, t, deps, result):
    Wait for deps to be committed
    Wait for deps with a lower t to be applied locally
    Apply result locally
    Applied[X] = true

...