Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Each of these mechanisms has its own implementation, while none of them can guarantee consistency out of the box during network partitioning. This document suggests a design which that will eliminate duplicating logical pieces of code and provide robust building blocks for cluster metadata management and cache protocols.

...

  • SWIM [3], an eventually-consistent protocol widely used by multiple systems, with Java implementation available [4].
  • RAPID [5], a novel consistent group membership protocol, with Java implementation also available [6].

Cluster and node lifecycles

As described in IEP-55, nodes in a cluster rely on local and distributed metastorages during the operation. The following actions facilitate the usage of the metastorages:

  • Local metastorage initialization. This step is executed before the very first startup of any Ignite 3.0 node and may be implicit (the local metastorage is initialized with default settings in this case). Upon initialization, the local metastorage contains no information about the distributed metastorage.
  • Once a node starts, the discovery (group membership) starts working to maintain the list of online nodes.
  • Until the distributed metastorage is initialized, the nodes in a cluster remain in a 'limbo' state awaiting the distributed metastorage initialization.
  • Distributed metastorage is initialized either by an administrator command or using a pre-specified set of nodes. During the metastorage initialization, an initial metastorage Raft group is created.
  • All nodes record the last known configuration of the metastorage Raft group. If written, the node attempts to communicate with the distributed metastorage using the available configuration.
  • The changes in cluster membership do not directly affect the configuration of the distributed metastorage Raft group, nor the configuration of the partitions. Reported node unavailability via the cluster membership should be considered as a hint for other layers, but the Raft group can still try to periodically contact the unavailable member in an attempt to make progress under partial network partitions (since SWIM protocol is eventually consistent).
  • All nodes subscribe to the metastorage update feed which delivers updates to the distributed metastorage as a linearizable consistent sequence of distributed metastorage modifications. Along with the metastorage Raft group configuration changes, it contains the changes to the cluster configuration (such as the list of caches, their configuration, partitions assignment, etc). Each node reacts to the changes from the distributed metastorage by adjusting the local configuration and performing corresponding actions (creating/destroying tables, moving partitions, etc).
  • If one or more of the nodes from the metastorage Raft group goes offline, but the Raft group remains available (a majority of the nodes are online), the cluster remains available. The metastorage group can be reconfigured to include a new member (to mitigate the risk of further unavailability) either automatically (similar to baseline auto-adjust), or manually by the administrator command.
  • If the number of offline nodes in the metastorage Raft group is larger than the quorum, the metastorage becomes unavailable. Further configuration changes to the cluster start to temporarily fail, but table operations keep working as long as corresponding partitions are available. Once the majority of the metastorage Raft group becomes available, the changes to the cluster configuration become available as well.
  • If more than the majority of the metastorage Raft group members are permanently lost, it is impossible to restore the metastorage availability without potential data loss. In this case, the cluster is moved to the recovery mode and the metastorage Raft group is re-initialized using one of the available members as the 'golden source'.

Application to caches protocol

...

  • A stream of committed operations that are guaranteed to be the same on all nodes in the replication group. Each operation in the stream is provided with a unique monotonous continuous index assigned by the replication module. The stream is durable and can be re-read as many times as needed as long as operations have not been compacted by a user/system request. The committed operations can be applied to the storage asynchronously as long as read operations are properly synchronized with the storage state to make sure to read-only when the needed operations are applied.
  • A readIndex() operation that returns the most up-to-date committed (durable) operation index in the replication group providing linearizability. readIndex() performance varies from protocol to protocol: for canonical Raft, this operation involves a round-trip to the majority of group members from the current leader, while in PacificA primary node can return the index locally as long as the primary lease is valid. If a node has received and applied an operation with an index at least as large as the one returned by readIndex(), the state can be safely read locally.

Separating replication and 2PC for transactional caches

Once we have a replication primitive in place, we can achieve the following goals:

  • Single-partition transactions are committed in a single replication round
  • The notion of primary and replica nodes is hidden from the upper layers of transactional logic. Upper layers operate on partitions by applying certain operations to partitions that are in turn replicated within the replication group. For example, a batch write to a partition i should be expressed as partition(i).replicate(new WriteBatch(writeMap)). The replicate operation may fail, but the write will be applied or not as a whole within the replication group
  • Simplify the existing Ignite transaction modes while preserving the current guarantees of Ignite transactions: get rid of transaction isolation levels on key-value API, keeping only PESSIMISTIC and OPTIMISTIC concurrency modes, which will match to PESSIMISTIC REPEATABLE_READ and OPTIMISTIC SERIALIZABLE modes of Ignite 2.x.

The transactional protocol operates as follows:

  • Each replication group leader maintains an in-memory keys lock table for pending transactions (this corresponds to the current lock candidates in GridCacheMapEntry). The lock table is not replicated and bound to the current group leader. Every transactional operation validates that the current group leader matches the locks table entries when an operation is applied.
  • PESSIMISTIC transactions acquire locks on write or getForUpdate operations, OPTIMISTIC transactions acquire locks on prepare phase, similarly to Ignite 2.x
  • During the transaction prepare phase, the pending updates and transaction PREPARED state is written and replicated (durably when persistence is enabled) to the partition. All possible constraints must be checked during the prepare state to guarantee that no regular flow can prevent PREPARED→COMMITTED transition
  • If a replication group leader fails after the transaction becomes PREPARED, there is no need to acquire locks for the transaction keys anymore. A new replication leader must wait for the PREPARED keys to be committed or aborted before assigning new locks for the keys
  • If the transaction coordinator fails before all partitions have prepared or committed the transaction, a cooperative termination protocol is initiated. To facilitate the protocol, we send the transaction participants (partitions) list along with the prepare operations. Cooperative termination assumes that if all transaction participants are in PREPARED state, then the transaction must be committed.
    • If a transaction is in PREPARED state for some partition and the transaction coordinator is suspected to be failed, the partition sends a finalize operation to all other partitions. The finalize operation returns true if such transaction was present on the node in PREPARED or COMMITTED state, but returns false if the transaction was already ABORTED or the transaction did not exist (in this case the finalize operation additionally prevents this transaction to ever enter the PREPARED state by marking it as ABORTED).
    • Cooperative termination commits the local transaction if all participants returned true, and aborts if either of the participants returned false.
    • The cooperative termination can be run by all partitions simultaneously or by a newly chosen transaction coordinator. In the latter case, once the coordinator learned the transaction outcome, it must also re-broadcast it to all pending participants
    • Cooperative termination can even be called when a transaction coordinator is alive, but hangs for some reason. In this case, the transaction outcome will be unambiguously decided by either the cooperative termination or the transaction coordinator.
  • If Ignite transaction participates in an external transaction (XA), cooperative termination is not applicable, the transaction state must be resolved externally. Ignite must only provide an aggregating interface to list PREPARED, COMMITTED, and ABORTED external transactions.

Consistency guarantees

By default, both Raft and PacificA provide linearizable guarantees for reads via readIndex() procedure. This, however, puts a restriction on the reader because readIndex() can only be served from the current replica group leader and, in case of Raft, involves a network round-trip to the group quorum. To address this, we can additionally introduce a more relaxed, causally consistent, read operation.

Check the IEP-99: Pluggable replication page for details on a pluggable replication protocol.

Atomic protocol

The main idea behind the atomic protocol in Ignite 2.x was a performance - because people not always need a transaction and associated coordination overhead.

Nevertheless, the atomic protocol implementation in Ignite 2.x has multiple flaws - absence of atomicity for batch operations (de facto atomicity per key), best effort consistency, side effects during retries on non-idempotent operations, the presence of PartialUpdateException for manual retries when automatic is not possible. Moreover, if a cache was created in ATOMIC mode, no transaction can be run on it due to protocols incompatibility.

Looks like we need only one cache type in Ignite 3 with strong guarantees for a better user experience.

New transaction protocol will work faster for single partition batches, allowing to commit as soon as all entries are replicated, and eliminating performance issues related to coordination overhead, making atomic protocol obsolete.

Initial data loading

A special case of initial data loading is worth mentioning. This scenario must be executed as fast as possible, so a cache can be moved to a special state, allowing it to use other protocol for initial data loading with weaker guarantees, disallowing concurrent transactions during the process.

This is a topic for separate IEP.

Transactional protocol

The transaction protocol is build on top of partition replication.

IEP-91: Transaction protocolTo provide such a consistency mode, each reader maintains a last-seen index for each partition. On write operation, the written operation index is returned to the client. On read operation, the client sends the local observed index to any of the replicas. The replica then waits for at least the given operation index to be applied and returns the read value together with the applied index at the moment of the read. Therefore, the client guarantees to see it's own updates and never sees more stale value that it has already seen. The set of counters on local clients constitutes a causality token which can be attached to compute tasks, services, etc. to guarantee causal consistency across other Ignite services. Additionally, we can introduce an API to return the current token so that a client can pass it via external communication channels.

Data Structures Building Block

...

  1. Raft Consensus Protocol
  2. PacificA Replication Protocol
  3. SWIM Group Membership Protocol
  4. Java implementation of SWIM
  5. RAPID Group Membership Protocol
  6. Java implementation of RAPID


Tickets

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
maximumIssues20
jqlQueryproject=IGNITE and labels in (iep-61)
serverId5aa69414-a9e9-3523-82ec-879b028fb15b