You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

IDIEP-61
Author
Sponsor
Created

  

Status
DRAFT

Motivation

In Ignite 2.x there are several different mechanisms (some of them are 'nested') that share a semi-common goal:

  • Atomic cache protocol to execute atomic updates on keys within a single partition
  • 2PC-like replication protocol for transactional cache to replicate transactions between primary and backups
  • System cache to store a part of cluster metadata (authentication information, services assignments)
  • Additional system caches to store instances of data structures (Locks, Semaphores, etc)
  • Various local and distributed values maintained via discovery custom events (caches, indexes, baseline topology)
  • Distributed metadata storage based on discovery custom events (migrated services assignments, baseline auto-adjust configuration)

Each of these mechanisms has its own implementation, while none of them can guarantee consistency out of the box during network partitioning. This document suggests a design which will eliminate duplicating logical pieces of code and provide robust building blocks for cluster metadata management and cache protocols.

Description

Revisited Distributed Metastorage

To manage cluster metadata, we suggest exploiting schema and API similar to the ones exposed by the ETCD service. The distributed metastorage is a simple HA KV storage with:

  • Total ordering of all CRUD updates (each update produces monotonically increasing across all keys version)
  • Ability to read past versions of a key and manual old versions cleanup
  • Ability to query the list of changes a given range of keys. The query start is denoted by a KV range and a start version so that the metastorage will stream all updates in the given range starting from the given version in chronological order. The query can be repeated as many times as the client requested until the updates with the given version are manually requested to be compacted.

Such a metastorage becomes a golden source of truth for metadata for each node in the cluster. Cluster configuration (including caches), additional cache metadata (indexes, data schema), affinity assignment, baseline topology, services assignments will be moved to the metadata storage. Discovery custom events will be eliminated as a low-level synchronization primitive in favor of ordered data updates in the metadata storage.

Metastorage Interface

An approximate metastorage interface is as follows:

Metastorage
public interface DistributedMetaStorage {
    public Future<ReadResponse> read(Read read);

    public Future<WriteResponse> write(Write write);
    public Future<DeleteResponse> delete(Delete del);
    public Future<UpdateResponse> update(Update update);

    public WatchControl watch(Watch watch, WatchObserver observer);
}

// Read, Write, Delete, Update extend Request class

public class Update extends Request {
    private List<Condition> cond;
    private List<Request> onSuccess;
    private List<Request> onFailure;
}

public interface WatchObserver {
    public void onUpdate(Entry oldEntry, Entry newEntry);
    public void onCancelled();
}


A typical usage pattern for the distributed metastorage in pseudocode may look as follows:

Updating entry in metastorage
res = metastorage.read(new Read(key)).get();

newVal = update(...); // Update property value according to the running action

updated = metastorage.update(new Update(
	key,
    new VersionCondition(EQUAL, res.version()),
    new Write(key, newVal)
)).get();

if (!updated.succeeded()) {
    // Handle a concurrent update to the same property.
}
Following metastorage updates
propVal, propVersion = currentState(key); // Get the latest property value the the local node has seen/processed.

metastorage.watch(new Watch(key, propVersion), (oldEntry, newEntry) -> {
    // Process updates propagated to the metastorage. Executed in the same order on all nodes.
});

Raft as Metastorage Replication Protocol

As a fundamental building block for distributed metastorage, an implementation of the Raft consensus protocol will be used [1]. The protocol is well-studied and has a large number of implementations, we can use one of them as a library, adopt the code of existing implementation for Ignite, or write a custom one. 

Raft provides a consistent replicated log of abstract commands which are applied to an abstract state machine once the commands are replicated. Each command in the log has a sequence number thus providing an implicit version for each of the executed commands. Additional version numbering scheme can be introduced as commands get executed to avoid coupling with the log indexes.

Raft replication group for metastorage will be deployed on a small subset of Ignite nodes (3-5-7 nodes) to avoid high latency for metastorage updates and reduce the time of leader election in large clusters. Upon a node failure, the metastorage replication group may trigger automatic reconfiguration after some timeout to ensure a sufficient number of replicas in the replication group.

Additional Replication Protocols

Assuming we have a HA split-brain resistant distributed metastorage, we can implement other replication protocols with different availability and performance properties, such as PacificA [2]. Both protocols provide the same distributed log abstraction for commands replication. The table below summarizes the difference between the two protocols:


RaftPacificA
Availability conditionsA majority of the group must be online to make progressCan make progress even with one member left
DependenciesIndependentRequires external consistent configuration manager
LatencyCan acknowledge an operation after a majority of the group respondedMust await responses from either all group members or wait for failure detection timeout and reconfigure the group before acknowledging an operation
Other
Relies on clock timeouts to ensure linearizability of operations

Group Membership

Besides metastorage itself, Ignite will need a group membership/discovery service with likely less strict consistency guarantees than the current Discovery SPI. The membership service will be used (not limited to):

  • Discover and automatically form an initial metastorage Raft replication group
  • Provide cluster membership changes events for subsystems that do not require strict consistency guarantees (Compute grid, SQL query execution runtime, etc).

The requirements for these subsystems should be carefully examined to choose a proper cluster membership protocol. The alternatives we currently have are:

  • SWIM [3], an eventually-consistent protocol widely used by multiple systems, with Java implementation available [4].
  • RAPID [5], a novel consistent group membership protocol, with Java implementation also available [6].

Further Applications

Partition Replication

The replication protocol can be used as an abstraction for hiding primary-backup nodes replication so that upper layers work with partition objects regardless of how many nodes the data is replicated to. In this case, the atomic cache operations become trivial CRUD operations replicated by the protocol. Moreover, there is no need to differentiate between atomic and transactional caches as multi-partition transactions become a set of operations that are applied to partitions (which are, in turn, replicated within each partition).

Additionally, since log replication and operation application are separated, the latency of an update will not depend on the complexity of the operation itself (for example, the number of secondary indexes used for a given table).

Among others, the replication module provides the following interfaces to the storage layer:

  • A stream of committed operations that are guaranteed to be the same on all nodes in the replication group. Each operation in the stream is provided with a unique monotonous continuous index assigned by the replication module. The stream is durable and can be re-read as many times as needed as long as operations have not been compacted by a user/system request. The committed operations can be applied to the storage asynchronously as long as read operations are properly synchronized with the storage state to make sure to read only when the needed operations are applied.
  • A readIndex() operation that returns the most up-to-date committed (durable) operation index in the replication group providing linearizability. readIndex() performance varies from protocol to protocol: for canonical Raft, this operation involves a round-trip to the majority of group members from the current leader, while in PacificA primary node can return the index locally as long as the primary lease is valid. If a node has received and applied an operation with an index at least as large as the one returned by readIndex(), the state can be safely read locally.

Separating replication and 2PC for transactional caches

Once we have a replication primitive in place, we can achieve the following goals:

  • Single-partition transactions are committed in a single replication round
  • The notion of primary and replica nodes is hidden from the upper layers of transactional logic. Upper layers operate on partitions by applying certain operations to partitions that are in turn replicated within the replication group. For example, a batch write to a partition i should be expressed as partition(i).replicate(new WriteBatch(writeMap)). The replicate operation may fail, but the write will be applied or not as a whole within the replication group
  • Simplify the existing Ignite transaction modes while preserving the current guarantees of Ignite transactions: get rid of transaction isolation levels on key-value API, keeping only PESSIMISTIC and OPTIMISTIC concurrency modes, which will match to PESSIMISTIC REPEATABLE_READ and OPTIMISTIC SERIALIZABLE modes of Ignite 2.x.

The transactional protocol operates as follows:

TBD

Data Structures Building Block

An instance of replication protocol can be used to further implement various data structures and synchronization primitives that are currently placed in a separate system cache.

Links

  1. Raft Consensus Protocol
  2. PacificA Replication Protocol
  3. SWIM Group Membership Protocol
  4. Java implementation of SWIM
  5. RAPID Group Membership Protocol
  6. Java implementation of RAPID
  • No labels