ID | IEP-61 |
Author | |
Sponsor | |
Created |
|
Status | DRAFT |
In Ignite 2.x there are several different mechanisms (some of them are 'nested') that share a semi-common goal:
Each of these mechanisms has its own implementation, while none of them can guarantee consistency out of the box during network partitioning. This document suggests a design which will eliminate duplicating logical pieces of code and provide robust building blocks for cluster metadata management and cache protocols.
To manage cluster metadata, we suggest exploiting schema and API similar to the ones exposed by the ETCD service. The distributed metastorage is a simple HA KV storage with:
Such a metastorage becomes a golden source of truth for metadata for each node in the cluster. Cluster configuration (including caches), additional cache metadata (indexes, data schema), affinity assignment, baseline topology, services assignments will be moved to the metadata storage. Discovery custom events will be eliminated as a low-level synchronization primitive in favor of ordered data updates in the metadata storage.
An approximate metastorage interface is as follows:
public interface DistributedMetaStorage { public Future<ReadResponse> read(Read read); public Future<WriteResponse> write(Write write); public Future<DeleteResponse> delete(Delete del); public Future<UpdateResponse> update(Update update); public WatchControl watch(Watch watch, WatchObserver observer); } // Read, Write, Delete, Update extend Request class public class Update extends Request { private List<Condition> cond; private List<Request> onSuccess; private List<Request> onFailure; } public interface WatchObserver { public void onUpdate(Entry oldEntry, Entry newEntry); public void onCancelled(); }
A typical usage pattern for the distributed metastorage in pseudocode may look as follows:
res = metastorage.read(new Read(key)).get(); newVal = update(...); // Update property value according to the running action updated = metastorage.update(new Update( key, new VersionCondition(EQUAL, res.version()), new Write(key, newVal) )).get(); if (!updated.succeeded()) { // Handle a concurrent update to the same property. }
propVal, propVersion = currentState(key); // Get the latest property value the the local node has seen/processed. metastorage.watch(new Watch(key, propVersion), (oldEntry, newEntry) -> { // Process updates propagated to the metastorage. Executed in the same order on all nodes. });
As a fundamental building block for distributed metastorage, an implementation of the Raft consensus protocol will be used [1]. The protocol is well-studied and has a large number of implementations, we can use one of them as a library, adopt the code of existing implementation for Ignite, or write a custom one.
Raft provides a consistent replicated log of abstract commands which are applied to an abstract state machine once the commands are replicated. Each command in the log has a sequence number thus providing an implicit version for each of the executed commands. Additional version numbering scheme can be introduced as commands get executed to avoid coupling with the log indexes.
Raft replication group for metastorage will be deployed on a small subset of Ignite nodes (3-5-7 nodes) to avoid high latency for metastorage updates and reduce the time of leader election in large clusters. Upon a node failure, the metastorage replication group may trigger automatic reconfiguration after some timeout to ensure a sufficient number of replicas in the replication group.
Assuming we have a HA split-brain resistant distributed metastorage, we can implement other replication protocols with different availability and performance properties, such as PacificA [2]. Both protocols provide the same distributed log abstraction for commands replication. The table below summarizes the difference between the two protocols:
Raft | PacificA | |
---|---|---|
Availability conditions | A majority of the group must be online to make progress | Can make progress even with one member left |
Dependencies | Independent | Requires external consistent configuration manager |
Latency | Can acknowledge an operation after a majority of the group responded | Must await responses from either all group members or wait for failure detection timeout and reconfigure the group before acknowledging an operation |
Other | Relies on clock timeouts to ensure linearizability of operations |
The replication protocol can be used as an abstraction for hiding primary-backup nodes replication so that upper layers work with partition objects regardless of how many nodes the data is replicated to. In this case, the atomic cache operations become trivial CRUD operations replicated by the protocol. Moreover, there is no need to differentiate between atomic and transactional caches as multi-partition transactions become a set of operations that are applied to partitions (which are, in turn, replicated within each partition).
Additionally, since log replication and operation application are separated, the latency of an update will not depend on the complexity of the operation itself (for example, the number of secondary indexes used for a given table).
An instance of replication protocol can be used to further implement various data structures and synchronization primitives that are currently placed in a separate system cache.