You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 10 Next »

IDIEP-61
Author
Sponsor
Created

  

Status
DRAFT

Motivation

In Ignite 2.x there are several different mechanisms (some of them are 'nested') that share a semi-common goal:

  • Atomic cache protocol to execute atomic updates on keys within a single partition
  • 2PC-like replication protocol for transactional cache to replicate transactions between primary and backups
  • System cache to store a part of cluster metadata (authentication information, services assignments)
  • Additional system caches to store instances of data structures (Locks, Semaphores, etc)
  • Various local and distributed values maintained via discovery custom events (caches, indexes, baseline topology)
  • Distributed metadata storage based on discovery custom events (migrated services assignments, baseline auto-adjust configuration)

Each of these mechanisms has its own implementation, while none of them can guarantee consistency out of the box during network partitioning. This document suggests a design that will eliminate duplicating logical pieces of code and provide robust building blocks for cluster metadata management and cache protocols.

Description

Revisited Distributed Metastorage

To manage cluster metadata, we suggest exploiting schema and API similar to the ones exposed by the ETCD service. The distributed metastorage is a simple HA KV storage with:

  • Total ordering of all CRUD updates (each update produces monotonically increasing across all keys version)
  • Ability to read past versions of a key and manual old versions cleanup
  • Ability to query the list of changes a given range of keys. The query start is denoted by a KV range and a start version so that the metastorage will stream all updates in the given range starting from the given version in chronological order. The query can be repeated as many times as the client requested until the updates with the given version are manually requested to be compacted.

Such a metastorage becomes a golden source of truth for metadata for each node in the cluster. Cluster configuration (including caches), additional cache metadata (indexes, data schema), affinity assignment, baseline topology, services assignments will be moved to the metadata storage. Discovery custom events will be eliminated as a low-level synchronization primitive in favor of ordered data updates in the metadata storage.

Metastorage Interface

An approximate metastorage interface is as follows:

Metastorage
public interface DistributedMetaStorage {
    public Future<ReadResponse> read(Read read);

    public Future<WriteResponse> write(Write write);
    public Future<DeleteResponse> delete(Delete del);
    public Future<UpdateResponse> update(Update update);

    public WatchControl watch(Watch watch, WatchObserver observer);
}

// Read, Write, Delete, Update extend Request class

public class Update extends Request {
    private List<Condition> cond;
    private List<Request> onSuccess;
    private List<Request> onFailure;
}

public interface WatchObserver {
    public void onUpdate(Entry oldEntry, Entry newEntry);
    public void onCancelled();
}


A typical usage pattern for the distributed metastorage in pseudocode may look as follows:

Updating entry in metastorage
res = metastorage.read(new Read(key)).get();

newVal = update(...); // Update property value according to the running action

updated = metastorage.update(new Update(
	key,
    new VersionCondition(EQUAL, res.version()),
    new Write(key, newVal)
)).get();

if (!updated.succeeded()) {
    // Handle a concurrent update to the same property.
}
Following metastorage updates
propVal, propVersion = currentState(key); // Get the latest property value the the local node has seen/processed.

metastorage.watch(new Watch(key, propVersion), (oldEntry, newEntry) -> {
    // Process updates propagated to the metastorage. Executed in the same order on all nodes.
});

Raft as Metastorage Replication Protocol

As a fundamental building block for distributed metastorage, an implementation of the Raft consensus protocol will be used [1]. The protocol is well-studied and has a large number of implementations, we can use one of them as a library, adopt the code of existing implementation for Ignite, or write a custom one. 

Raft provides a consistent replicated log of abstract commands which are applied to an abstract state machine once the commands are replicated. Each command in the log has a sequence number thus providing an implicit version for each of the executed commands. Additional version numbering scheme can be introduced as commands get executed to avoid coupling with the log indexes.

Raft replication group for metastorage will be deployed on a small subset of Ignite nodes (3-5-7 nodes) to avoid high latency for metastorage updates and reduce the time of leader election in large clusters. Upon a node failure, the metastorage replication group may trigger automatic reconfiguration after some timeout to ensure a sufficient number of replicas in the replication group.

Additional Replication Protocols

Assuming we have a HA split-brain resistant distributed metastorage, we can implement other replication protocols with different availability and performance properties, such as PacificA [2]. Both protocols provide the same distributed log abstraction for commands replication. The table below summarizes the difference between the two protocols:


RaftPacificA
Availability conditionsA majority of the group must be online to make progressCan make progress even with one member left
DependenciesIndependentRequires external consistent configuration manager
LatencyCan acknowledge an operation after a majority of the group respondedMust await responses from either all group members or wait for failure detection timeout and reconfigure the group before acknowledging an operation
Other
Relies on clock timeouts to ensure linearizability of operations

Group Membership

Besides metastorage itself, Ignite will need a group membership/discovery service with likely less strict consistency guarantees than the current Discovery SPI. The membership service will be used (not limited to):

  • Discover and automatically form an initial metastorage Raft replication group
  • Provide cluster membership changes events for subsystems that do not require strict consistency guarantees (Compute grid, SQL query execution runtime, etc).

The requirements for these subsystems should be carefully examined to choose a proper cluster membership protocol. The alternatives we currently have are:

  • SWIM [3], an eventually-consistent protocol widely used by multiple systems, with Java implementation available [4].
  • RAPID [5], a novel consistent group membership protocol, with Java implementation also available [6].

Cluster and node lifecycles

As described in IEP-55, nodes in a cluster rely on local and distributed metastorages during the operation. The following actions facilitate the usage of the metastorages:

  • Local metastorage initialization. This step is executed before the very first startup of any Ignite 3.0 node and may be implicit (the local metastorage is initialized with default settings in this case). Upon initialization, the local metastorage contains no information about the distributed metastorage.
  • Once a node starts, the discovery (group membership) starts working to maintain the list of online nodes.
  • Until the distributed metastorage is initialized, the nodes in a cluster remain in a 'limbo' state awaiting the distributed metastorage initialization.
  • Distributed metastorage is initialized either by an administrator command or using a pre-specified set of nodes. During the metastorage initialization, an initial metastorage Raft group is created.
  • All nodes record the last known configuration of the metastorage Raft group. If written, the node attempts to communicate with the distributed metastorage using the available configuration.
  • The changes in cluster membership do not directly affect the configuration of the distributed metastorage Raft group, nor the configuration of the partitions. Reported node unavailability via the cluster membership should be considered as a hint for other layers, but the Raft group can still try to periodically contact the unavailable member in an attempt to make progress under partial network partitions (since SWIM protocol is eventually consistent).
  • All nodes subscribe to the metastorage update feed which delivers updates to the distributed metastorage as a linearizable consistent sequence of distributed metastorage modifications. Along with the metastorage Raft group configuration changes, it contains the changes to the cluster configuration (such as the list of caches, their configuration, partitions assignment, etc). Each node reacts to the changes from the distributed metastorage by adjusting the local configuration and performing corresponding actions (creating/destroying tables, moving partitions, etc).
  • If one or more of the nodes from the metastorage Raft group goes offline, but the Raft group remains available (a majority of the nodes are online), the cluster remains available. The metastorage group can be reconfigured to include a new member (to mitigate the risk of further unavailability) either automatically (similar to baseline auto-adjust), or manually by the administrator command.
  • If the number of offline nodes in the metastorage Raft group is larger than the quorum, the metastorage becomes unavailable. Further configuration changes to the cluster start to temporarily fail, but table operations keep working as long as corresponding partitions are available. Once the majority of the metastorage Raft group becomes available, the changes to the cluster configuration become available as well.
  • If more than the majority of the metastorage Raft group members are permanently lost, it is impossible to restore the metastorage availability without potential data loss. In this case, the cluster is moved to the recovery mode and the metastorage Raft group is re-initialized using one of the available members as the 'golden source'.

Application to caches protocol

Partition Replication

The replication protocol can be used as an abstraction for hiding primary-backup nodes replication so that upper layers work with partition objects regardless of how many nodes the data is replicated to. In this case, the atomic cache operations become trivial CRUD operations replicated by the protocol. Moreover, there is no need to differentiate between atomic and transactional caches as multi-partition transactions become a set of operations that are applied to partitions (which are, in turn, replicated within each partition).

Additionally, since log replication and operation application are separated, the latency of an update will not depend on the complexity of the operation itself (for example, the number of secondary indexes used for a given table).

Among others, the replication module provides the following interfaces to the storage layer:

  • A stream of committed operations that are guaranteed to be the same on all nodes in the replication group. Each operation in the stream is provided with a unique monotonous continuous index assigned by the replication module. The stream is durable and can be re-read as many times as needed as long as operations have not been compacted by a user/system request. The committed operations can be applied to the storage asynchronously as long as read operations are properly synchronized with the storage state to make sure to read only when the needed operations are applied.
  • A readIndex() operation that returns the most up-to-date committed (durable) operation index in the replication group providing linearizability. readIndex() performance varies from protocol to protocol: for canonical Raft, this operation involves a round-trip to the majority of group members from the current leader, while in PacificA primary node can return the index locally as long as the primary lease is valid. If a node has received and applied an operation with an index at least as large as the one returned by readIndex(), the state can be safely read locally.

Atomic protocol

The main idea behind the atomic protocol in Ignite 2.x was a performance - because people not always need a transactions and associated coordination overhead.

Nevertheless, the atomic protocol implementation in Ignite 2.x had multiple flaws - absence of atomicity for batch operations (de facto atomicity per key), best effort consistency, side effects during retries on non-idempotent operations, the presence of PartialUpdateException for manual retries when automatic is not possible. Moreover, if a cache was created in ATOMIC mode, no transaction can be run on it due to protocols incompatibility.

In upcoming Ignite 3 strong consistency is provided by RAFT replication, but batch updates are still not atomic, causing the same issues as in Ignite 2.x

Looks like we need only one cache type in Ignite 3 with strong guaranties for better user experience.

New transaction protocol will work faster for single partition batches, allowing to commit as soon as all entries are replicated, and eliminating performance issues related to coordination overhead, making atomic protocol obsolete.

Initial data loading

A special case of initial data loading is worth mentioning. This scenario must be executed as fast as possible, so a cache can be moved to special state, allowing it to use other protocol for initial data loading with weaker guarantees, disallowing concurrent transactions during the process.

Separating replication and 2PC for transactional caches

Once we have a replication primitive in place, we can achieve the following goals:

  • Single-partition transactions are committed in a single replication round
  • The notion of primary and replica nodes is hidden from the upper layers of transactional logic. Upper layers operate on partitions by applying certain operations to partitions that are in turn replicated within the replication group. For example, a batch write to a partition i should be expressed as partition(i).replicate(new WriteBatch(writeMap)). The replicate operation may fail, but the write will be applied or not as a whole within the replication group
  • Simplify the existing Ignite transaction modes while preserving the current guarantees of Ignite transactions: get rid of transaction isolation levels on key-value API, keeping only PESSIMISTIC and OPTIMISTIC concurrency modes, which will match to PESSIMISTIC REPEATABLE_READ and OPTIMISTIC SERIALIZABLE modes of Ignite 2.x.

The transactional protocol operates as follows:

  • Each replication group leader maintains an in-memory keys lock table for pending transactions (this corresponds to the current lock candidates in GridCacheMapEntry). The lock table is not replicated and bound to the current group leader. Every transactional operation validates that the current group leader matches the locks table entries when an operation is applied.
  • PESSIMISTIC transactions acquire locks on write or getForUpdate operations, OPTIMISTIC transactions acquire locks on prepare phase, similarly to Ignite 2.x
  • During the transaction prepare phase, the pending updates and transaction PREPARED state is written and replicated (durably when persistence is enabled) to the partition. All possible constraints must be checked during the prepare state to guarantee that no regular flow can prevent PREPARED→COMMITTED transition
  • If a replication group leader fails after the transaction becomes PREPARED, there is no need to acquire locks for the transaction keys anymore. A new replication leader must wait for the PREPARED keys to be committed or aborted before assigning new locks for the keys
  • If the transaction coordinator fails before all partitions have prepared or committed the transaction, a cooperative termination protocol is initiated. To facilitate the protocol, we send the transaction participants (partitions) list along with the prepare operations. Cooperative termination assumes that if all transaction participants are in PREPARED state, then the transaction must be committed.
    • If a transaction is in PREPARED state for some partition and the transaction coordinator is suspected to be failed, the partition sends a finalize operation to all other partitions. The finalize operation returns true if such transaction was present on the node in PREPARED or COMMITTED state, but returns false if the transaction was already ABORTED or the transaction did not exist (in this case the finalize operation additionally prevents this transaction to ever enter the PREPARED state by marking it as ABORTED).
    • Cooperative termination commits the local transaction if all participants returned true, and aborts if either of the participants returned false.
    • The cooperative termination can be run by all partitions simultaneously or by a newly chosen transaction coordinator. In the latter case, once the coordinator learned the transaction outcome, it must also re-broadcast it to all pending participants
    • Cooperative termination can even be called when a transaction coordinator is alive, but hangs for some reason. In this case, the transaction outcome will be unambiguously decided by either the cooperative termination or the transaction coordinator.
  • If Ignite transaction participates in an external transaction (XA), cooperative termination is not applicable, the transaction state must be resolved externally. Ignite must only provide an aggregating interface to list PREPARED, COMMITTED, and ABORTED external transactions.

Consistency guarantees

By default, both Raft and PacificA provide linearizable guarantees for reads via readIndex() procedure. This, however, puts a restriction on the reader because readIndex() can only be served from the current replica group leader and, in case of Raft, involves a network round-trip to the group quorum. To address this, we can additionally introduce a more relaxed, causally consistent, read operation.

To provide such a consistency mode, each reader maintains a last-seen index for each partition. On write operation, the written operation index is returned to the client. On read operation, the client sends the local observed index to any of the replicas. The replica then waits for at least the given operation index to be applied and returns the read value together with the applied index at the moment of the read. Therefore, the client guarantees to see it's own updates and never sees more stale value that it has already seen. The set of counters on local clients constitutes a causality token which can be attached to compute tasks, services, etc. to guarantee causal consistency across other Ignite services. Additionally, we can introduce an API to return the current token so that a client can pass it via external communication channels.

Data Structures Building Block

An instance of replication protocol can be used to further implement various data structures and synchronization primitives that are currently placed in a separate system cache.

Links

  1. Raft Consensus Protocol
  2. PacificA Replication Protocol
  3. SWIM Group Membership Protocol
  4. Java implementation of SWIM
  5. RAPID Group Membership Protocol
  6. Java implementation of RAPID


Tickets

key summary type created updated due assignee reporter priority status resolution

JQL and issue key arguments for this macro require at least one Jira application link to be configured

  • No labels