You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 58 Next »

The document describes principles used for achieving consistency between data copies for transactional caches with enabled persistence.

Cluster topology

Cluster topology is a ordered set of client and server nodes at a certain point in time. Each stable topology is assigned a topology version number - monotonically increasing pair of counters (majorVer, minorVer). Each node maintains a list of nodes in the topology and it is the same on all nodes for given version. It is important to note that each node receives information about the topology change eventulally, that is, independently of the other node at the different time.

Affinity function

A set of all possible cache keys is divided between partitions. A partition is an indivisible data set in a grid, a subset of a total set of keys. The key uniquely determines the partition in which it will be stored.

Every change in topology causes new topology version. For each new version of the topology it is necessary to calculate how the data is distributed among the nodes in topology. Those nodes that are responsible for storing data are called affinity nodes. Each node owns a subset of partitions. If a new affinity node has joined then it is necessary to rebalance part of the data to the new node. 

The distribution (sharding) of data on the nodes of the grid is defined by affinity function

First, affinity function uniquely defines a partition for key: P(key) = partition.

This is a static property and cannot change in time.

The default implementation of function P is modulo division: abs(key.hashCode) % partitionsCount.

Second, affinity function defines which partitions belongs to the node: F(part) = list(nodes)

At different points in time a partition can belong to different nodes, therefore this is a dynamic property of a partition.

The number of nodes in this list is equal to the number of copies of the partition. Recall that one of the fundamental properties of a grid is its high availability. With the loss of a certain number of nodes, complete data loss does not occur and the load is not interrupted. The need to store multiple copies is due to the implementation of this property.

The first node in the list is the so-called primary node, has a special coordinating role for all operations on the partition. The remaining are backup nodes. In the event of a loss of a primary node, one of the backup nodes becomes the new primary.

Another important property of affinity function is the uniform distribution of partitions over nodes.

Look at possible partition distribution between nodes for topology with 3 server nodes s1, s2, s3 having 6 partitions 0, 1, 2, 3, 4, 5.

Function F could produce assignment something like this (in ideal world):

F(0) = s1, s2, s3

F(1) = s2, s3, s1

F(2) = s3, s1, s2

F(3) = s1, s2, s3

F(4) = s2, s3, s1

F(5) = s3, s1, s2

First node in the list corresponds to primary node, others are backups. Only list prefix equal in size to owners count is used, other nodes are ignored.

In real life results might not be so ideal because of hash randomization causing some nodes to have slightly more partitions then others. It depends on total partitions counter - the more total partitions count the more uniform is assignment.

The default implementation for function F uses the rendezvous hashing algorithm https://en.wikipedia.org/wiki/Rendezvous_hashing

Due to implementation of these properties of affinity function linear scalability is achieved when adding new nodes to the grid.

Rebalancing

Rebalancing - a procedure for exchanging data between nodes to ensure uniform distribution of data (redistribution of data between nodes) according to affinity assignment [1] and to ensure data consistency if copies diverge [2].

[1] Affinity based rebalancing
In the first case, the reason for the rebalancing is a change in the list of nodes - owners of the partition. If a new node has joined the topology then part of the partitions will be reassigned to it due to the affinity function property of the uniform distribution of partitions among the nodes. If the node is left then the rebalancing will be triggered to ensure the availability of the required number of copies of the partition in the grid. Such kind of rebalancing can only happen when baseline is changed.

For example, let's consider adding new node s4 to topology. The assignment will look like:

F(0) = s1, s2, s3, s4

F(1) = s2, s3, s4, s1

F(2) = s3, s4, s1, s2

F(3) = s4, s1, s2, s3

F(4) = s1, s2, s3, s4

F(5) = s2, s3, s4, s1

Node s4 will own all 6 partitions and will load them during rebalancing.

[2] Updates counter based rebalancing.
In the second case which is relevant when baseline is not changed rebalancing is used to restore the consistency of the data if the node was absent for some time in the topology under the load. This might happen if node is crashed for example.

When the node re-joins the topology, it will have stale data, and the partition update counter will be analyzed to determine how much the partitions have “lagged” from other copies.
Depending on how strong the partitions are lagging behind, the second type of rebalance can use the write-ahead log as a data source for updates history. In this case, the rebalancing can be performed without complete reloading of the partition and is called “historical”.
Another scenario for the occurrence of [2] is the activation of the grid having nodes in topology with stale data.

Technically rebalancing is the exchange of supply and demand messages between nodes. A demand message is sent from node where partition has stale data or missing to node with actual data. Supply message containing data is send in response to demand message. After processing a supply message next demand message is send and so on until nothing more to rebalance. We will look at this later in details.

Update counter structure

Currently three different implementations of update counter logic exist: for persistent mode, for in-memory mode and for MVCC mode.

In this article we only look at former. The main difference with latter is ability to track updates applied out of order.

Each update to a partition receives a sequential number for a purpose of ensuring data consistency between copies. A special structure called partition update counter is used to assign and increment update counters.

Partition update counter has the following structure:

/** Low watermark. */
private final AtomicLong cntr = new AtomicLong();

Low watermark, or update counter is used to track sequential updates. It is only incremented when corresponding update is recorded to durable storage and no missed updates are exist with lesser counter value.

For example, LWM value=5 means what all updates with assigned counters 1, 2, 3, 4, 5 were applied to WAL.


/** High watermark. */
protected final AtomicLong reserveCntr = new AtomicLong();
High watermark or reservation counter is incremented for each pending update, which even not guaranteed to succeed. Reservation counter is assigned during tx prepare phase.
For example, LWM=5 and HWM=9 means four updates numbered 6,7,8,9 are on the fly.

/** Updates applied out of order. */
private SortedSet<Range> seq = new TreeSet<>();

This field is used for recording updates applied out of order. This is possible due to a fact what updates this higher counter could be applied to WAL before updates with lower counter causing gaps in update sequence.

/**
* Update counter task. Update from start value by delta value.
*/
private static class Range implements Comparable<Range> {
/** */
private long start;

/** */
private long delta;

/** {@inheritDoc} */
@Override public int compareTo(@NotNull Range r) {
return Long.compare(this.start, r.start);
}
}

A range represents a sequence of updates, for example (5, 3) means three updates with number 6, 7, 8. We will use this notation again later.

Out of order updates range is held in the sequence only if an update with lower range is missing. For example, LWM=5, HWM=9, seq=(6,2) means updates 7, 8 were applied out of order and updates 6, 9 are still not applied.

Where is an important invariant which can be concluded from statements above:

HWM >= LWM

Update counter flow

The flow is closely bound to transactions protocol. Counters are modified during stages of two-phase commit (2PC) protocol used by Ignite to ensure distributed consistency.

2PC protocol variation used in Ignite consists of map, optional remaplock, prepare and commit phases.

We will use an example to illustrate the flow.

Suppose we have 3 server nodes and 1 client node in topology.

Two threads start two pessimistic (TODO javadoc ref) transactions concurrently updating keys belonging to the single partition p. Backups count is equal to 2 so total number of owners is 3.

Tx1 enlists and updates keys k1, k2, k3, Tx2 enlists and updates keys k4, k5.

P(k1) = P(k2) = P(k3) = P(k4) = P(k5) = p.

F(k1) = F(k2) = F(k3) = F(k4) = F(k5) = s1(Primary), s2(Backup1), s3(Backup2)

Ignite client = startClient();

IgniteCache<Integer, Integer> cache = client.cache(name);

Thread 1:

try(Transaction tx1 = client.transactions().txStart()) {
      cache.putAll(Map.of(k1, v1, k2, v2, k3, v3));
      tx1.commit();

}

Thread 2:

try(Transaction tx2 = client.transactions().txStart()) {
      cache.putAll(Map.of(k4, v4, k5, v4);

      tx2.commit();

}

The figure 1 below shows the flow with the case of backup reordering, where tx with higher update counters is applied before tx with lower update counters on backup.

Near map is a mapping of partitions to primary nodes. In the example each transaction will have only one node s1 in the near map.

When near map is ready, for each key enlisted in transaction a lock request is issued which aquires exclusive lock on each key (for optimistic transactions locks are acquired during commit but principles are the same).

It may happen that on the primary node at the time of request processing the topology version N + 1 is already relevant or in the process of changing to what version.

If next version is not compatible with previous (having the same partition assignment) a response is sent to the near node indicating that a more recent version of the topology is acutal and transaction should retry on next version. This process is called remap.

If the lock is successfully acquired the prepare phase begins. This also globally locks a topology version. New topology cannot be set until all transactions started on current topology are finished. This is handled by PME protocol.

Each locked key is assigned the update sequence number generated by using next value of HWM counter field. If multiple keys are locked in the same partition a range will be reserved.

When for each primary partition a DHT map is calculated. DHT map is a mapping of partitions to backup nodes. In the example each transaction will have 2 nodes in DHT map: s2 and s3.

When DHT map is ready primary node sends prepare requests to corresponding the backup nodes containing enlisted keys and their update counters.

Thus, update counters are synchronized between primaries and backups - assigned to primaries, then transferred to backups.

If all backup nodes are answered positively, then the transaction goes into the PREPARED state. In this state, it is ready for commit.

During commit all transactional changes are applied to the WAL first and LWM is increased next. At this point LWM catches up HWM.

Because transaction commits can reorder it's possible to have some updates applied out of order, as show by the example above. The out of order updates sequence is used for tracking them.

Let's imaging out of order updates are not tracked and LWM doesn't exists. If primary node fails before applying update from Tx1 we end up with update counter=5 and will lose missed updates after node rejoins to the topology, because rebalancing will not start due to equal counters.

Restoring consistency

The LWM value is used to calculate missing data when outdated node joins topology. In this scenario counter based rebalancing is used to achieve consistency.

Joining node sends LWM for each partition as a part of partition map exchange protocol. Coordinator collects all LWMs and determines nodes with highest LWM for each partition. These nodes are elected as data suppliers for a given partition.

If coordinator detects lagging node it sends a partition map with partition state changed to MOVING triggering a rebalance on joining node. The missing number of updates for each partition is calculated as a difference between highest known LWM and LWM from joining node.

The simplified rebalancing flow for node's join having outdated partition with id=0 is illustratred on the fugure 2:

Update counters are used to determine the state when the one partition copy is behind the others.

It is only safe to compare LWMs because all updates with counters below LWM are guaranteed to exist.

Thus, it is very important to correctly track LWMs since thier values are used to calculate when rebalancing is necessary to restore the integrity of partitions [2].

As already was mentioned, LWM could safely be incremented only if the update is written to the WAL. Otherwise, a situation is possible when the counter has increased, but there is no corresponding durable update.

Crash recovery

There are no guarantees that updates with already assigned counters will be ever applied.

For example, if during prepare phase some mapped nodes will exit the transaction will be rolled back or node can crash during commit.
This is the reasons for gaps in update sequence to appear. Several additional mechanisms are introduced to close such gaps.

Consider figure 3:

In the scenario on fig.3 Tx2 is committed before Tx1 had the chance to prepare and primary node fails.

All existing gaps remaining after all transactions are finished closed during PME on node left. This happens in org.apache.ignite.internal.processors.cache.PartitionUpdateCounter#finalizeUpdateCounters.

Consider figure 4:

In the scenario on fig.4 transaction is prepared only on one backup and primary fails. A special partition counters update request is used to sync counters with remaining backup node.

Importance of atomic primaries switching

As shown on fig. 4, when the primary node fails then one of the backups must become a new primary according to affinity assignment for next topology. This happens during partition map exchange.

During PME on counter exchange phase a new primary adjusts it's HWM to max known LWM value for each cache partition. This guarantees monotonous grow of HWM in the grid and valid generation of counters by new primaries.
It is important to ensure the atomicity of switching the primary node for partition on all grid nodes,  otherwise the HWM >= LWM invariant will break, which ensures that if at least one stable copy is available, everyone else can restore its integrity from it.

Atomic switchig is one of the PME protocol properties, making the situation when two different grid nodes use different primary node for mapping impossible.

The same issue exist when node joins.






  • No labels