Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: added code blocks

The document describes principles used for achieving consistency between data copies for transactional caches with enabled persistence. The information is relevant for AI 2.8 release or higher.

Cluster topology

Cluster topology is a an ordered set of client and server nodes at a certain point in time. Each stable topology topolog is assigned a topology version number - monotonically increasing pair of counters (majorVer, minorVer). Major version is used for tracking grid node events such join, left or fail, minor version is used for tracking internal events such cache start or activation. Each node maintains a list of nodes in the topology, and it is the same on all nodes for a given version. It is important to note that each node receives information about the topology change eventulally, that is, independently of the other node at the different timetopology changes at different times, meaning that at any point in time, nodes can see different topology versions, but eventually they will all see the same.

Affinity function

A set of all possible cache keys is divided between into partitions. A partition is an indivisible data set in a grid, which contains a subset of a the total set of keys. The Each key uniquely determines the partition in which it will be stored.

Every change in topology causes the topology - adding or removing nodes -  creates a new topology version. For each new version of the topologyit is necessary to calculate how the distribution of data is distributed among between the nodes in topology. Those nodes Nodes that are responsible for storing data are called affinity nodes. Each node owns a subset of partitions. If a new affinity node has joined then it is necessary to rebalance part of the data to joins the cluster,  the data is rebalanced to accommodate the new node. 

The Affinity function defines the distribution (sharding) of data on the nodes of the grid is defined by affinity function

First, affinity function uniquely defines a partition for a key:  PP(key) = partition partition.

This is a static property and cannot change at any point in time.

The default implementation of function P is modulo division: abs(key.hashCode) % partitionsCount.

Second, Second, affinity function defines which partitions partition belongs to the which node: F(topology, part) = list(nodes)

At different points in time, a partition can belong to different nodes. This Therefore, this is a dynamic property of a partition.

The number of nodes in this list is equal to the number of copies of the partition. Recall that one of the fundamental properties of a grid is its high availability. With the loss of a certain number of nodes, complete data loss does not occur and the load is not interrupted. The need to store multiple copies is due to the implementation of this property.

The first node in the list is the so-called primary node, which has a special coordinating role for all operations on the partition. The remaining are backup nodes. In the event of a loss of a primary node, one of the backup nodes becomes the new primary.

Todo PIC for some AF calculated state.

An Another important property of affinity function is the uniform distribution of partitions over nodes.

The default implementation uses the rendezvous hashing algorithm https://en.wikipedia.org/wiki/Rendezvous_hashing

Due to implementation of these properties linear scalability is achieved when adding new nodes to the grid.

Rebalancing

Rebalancing - a procedure for exchanging data between nodes to ensure uniform distribution of data (redistribution of data between nodes) [1] and to ensure data consistency [2].

[1] Partition state based rebalancing
In the first case, the reason for the rebalancing is a change in the list of nodes - owners of the partition. If a new node has joined the topology then part of the partitions will be reassigned to it due to the affinity function property of the uniform distribution of partitions among the nodes. If the node is left then the re-balance will be triggered to ensure the availability of the required number of copies of the partition in the grid.

[2] Counter based rebalancing.
In the second case, which is relevant only for the persistence mode, re-balance is used to restore the consistency of the data if the node was absent for some time in the topology under the load.
When the node re-joins the topology, the partition update counter will be analyzed to determine how much the partition has “lagged” from other copies.
Depending on how much the partition is lagging behind, the second type of rebalance can use the write-ahead log as a data source. In this case, the rebalancing can be performed without a complete reloading of the partition and is called “historical”.
Another scenario for the occurrence of [2] is the activation of the grid, where some nodes may have stale data.
Re-balance is the exchange of supply and demand messages between nodes. A demand message is sent from node where partition has stale data to node with actual data. Supply messages is send in response to demand message, after processing a supply message next demand message is send and so on until nothing more to supply.

Update counter structure

Each update to a partition receives a sequential number for a purpose of ensuring data consistency between copies. A special structure partition update counter is used to assign and increment update counters.

Partition update counter has the following structure:

...

Look at the possible partition distribution between nodes for a topology with three server nodes s1, s2, s3 containing six partitions 0, 1, 2, 3, 4, 5 (partition numeration begins with a zero in Ignite).

Function F could produce an assignment of nodes to each partition, which may look something like this (in the ideal world):

F(0) = s1, s2, s3

F(1) = s2, s3, s1

F(2) = s3, s1, s2

F(3) = s1, s2, s3

F(4) = s2, s3, s1

F(5) = s3, s1, s2

The first node in the list corresponds to the primary node, and other nodes are backups. Only list prefix equal in size to owners count is used, other nodes are ignored. Only those nodes that belong to the prefix list with the length of backups + 1 are used for data placement, others are ignored for partitioned cache.

In a real scenario, the results might not be so ideal because of hash randomization, causing some nodes to have slightly more partitions than others. It depends on total partitions count - the more the total partitions count, the more uniform is the assignment.

The default implementation for function F uses the rendezvous hashing algorithm https://en.wikipedia.org/wiki/Rendezvous_hashing

Due to the implementation of these properties of affinity function, linear scalability is achieved when adding new nodes to the grid.

Rebalancing

Rebalancing is the  procedure for exchanging data between nodes to ensure uniform distribution of data (redistribution of data between nodes) according to affinity assignment [1] and to ensure data consistency if copies diverge [2].

[1] Affinity based rebalancing

In the first case, the reason for data rebalancing is a change in the list of nodes - owners of the partition. If a new node joins the topology, then part of the partitions will be reassigned to it due to the property of affinity function that ensures uniform distribution of partitions between the nodes. If the node leaves the topology, then rebalancing will be triggered to ensure configured number of copies of a partition in the grid. Such kind of rebalancing can only happen when the baseline is changed.

For example, let's consider adding a new node - s4 to the topology. The assignment will look like:

F(0) = s1, s2, s3, s4

F(1) = s2, s3, s4, s1

F(2) = s3, s4, s1, s2

F(3) = s4, s1, s2, s3

F(4) = s1, s2, s3, s4

F(5) = s2, s3, s4, s1

Node s4 will own all 6 partitions and will load them during rebalancing.

[2] Updates counter based rebalancing.

In the second case which is relevant when baseline is not changed, rebalancing is used to restore the consistency of the data if the node was absent for some time in the topology. This can happen in case of a node crash.When the node re-joins the topology, it will have stale data, and a partition update counter will be analyzed to determine how much the partition have “lagged” in comparison to other copies.

Depending on how much the partitions are lagging behind, rebalancing can use the write-ahead log as a data source for updates history. In this case, rebalancing can be performed without complete reloading of the partition. Such type of rebalancing is called “historical”.

Another scenario for the occurrence of [2] is the activation of the grid with nodes  containing stale data.

Technically rebalancing is the exchange of supply and demand messages between nodes. A demand message is sent from the node where partition has stale data to the node that has the actual data. A supply message containing data is sent in response to the demand message. After processing the  supply message, the next demand message is sent, and so on, until there is nothing more to rebalance. We will look at this later in more detail.

Update counter structure

Currently, different implementations of update counter logic exist for persistent mode and for in-memory mode. In this article we will only look at former. The main difference with latter is ability to track updates that are  applied out of order.

Each update to a partition receives a sequential number for ensuring data consistency between copies. A special structure called partition update counter is used to assign and increment update counters.

A partition update counter has the following structure:

Code Block
languagejava
/** Low watermark. */
private final AtomicLong cntr = new AtomicLong();

Low watermark or update counter is used to track sequential updates. It is only incremented when the corresponding update is recorded to a durable storage, and no missed updates exist with lesser counter value.

For example, LWM value=5 means what all updates with assigned counters 1, 2, 3, 4, 5 were applied to WAL.

Code Block
languagejava
/** High watermark. */
private final AtomicLong reserveCntr = new AtomicLong();

High watermark or reservation counter is incremented for each pending update, which even not guaranteed to succeed. Reservation counter is assigned during tx prepare phase.

Code Block
languagejava
/** Updates applied out of order. */
private SortedSet<Range> seq = new TreeSet<>();

This field is used for recording the updates that are applied out of order. This is possible because updates with higher counter could be applied to WAL before updates with lower counter, causing gaps in the update sequence.


Code Block
languagejava
/**
* Update counter task. Update from start value by delta value.
*/
private static class Range implements Comparable<Range> {
    /** */
    private long start;

    /** */
    private long delta;

    /** {@inheritDoc} */
    @Override public int compareTo(@NotNull Range r) {
        return Long.compare(this.start, r.start);
    }

}


A range represents a sequence of updates, for example (5, 3) means three updates with number 6, 7, 8. We will use this notation again later.

Out of order updates range is held in the sequence only if an update with lower range is missing. For example, LWM=5, HWM=9, seq=(6,2) means updates 7, 8 were applied out of order and updates 6, 9 are still not applied.

Note that there is an important invariant which can be derived from the statements above:

HWM >= LWM

Update counter flow

The flow is closely bound to transactions protocol. Counters are modified during stages of two-phase commit (2PC) protocol used by Ignite to ensure distributed consistency.

2PC protocol variation used in Ignite consists of map, optional remap, lock, prepare and commit phases.

We will use an example to illustrate the flow:

Suppose we have 3 server nodes and 1 client node in the topology.

Two threads start two pessimistic (https://www.gridgain.com/docs/latest/developers-guide/key-value-api/transactions#pessimistic-transactions) transactions concurrently updating keys belonging to the single partition p. Backups count is equal to 2, so the total number of owners is 3.

Tx1 enlists and updates keys k1, k2, k3; Tx2 enlists and updates keys k4, k5.

P(k1) = P(k2) = P(k3) = P(k4) = P(k5) = p.

F(k1) = F(k2) = F(k3) = F(k4) = F(k5) = s1(Primary), s2(Backup1), s3(Backup2)


Code Block
languagejava
Ignite client = startClient();

IgniteCache<Integer, Integer> cache = client.cache(name);


Thread 1:


Code Block
languagejava
try(Transaction tx1 = client.transactions().txStart()) {

      cache.putAll(Map.of(k1, v1, k2, v2, k3, v3));

      tx1.commit();

}


Thread 2:


Code Block
languagejava
try(Transaction tx2 = client.transactions().txStart()) {

      cache.putAll(Map.of(k4, v4, k5, v4);

      tx2.commit();

}


Figure1 below shows the flow with the case of backup reordering, where tx with higher update counters is applied before the transaction with lower update counters on backup.

Image Added

Near map is a mapping of partitions to primary nodes. In the above example, each transaction will have only one node s1 in the near map.

When near map is ready for each key enlisted in the transaction, a lock request is issued which acquires exclusive lock on each key (for optimistic transactions locks are acquired during commit but principles are the same)

/** Queue of applied out of order counter updates. */
private TreeSet<Item> queue = new TreeSet<>();

Low watermark, or update counter is used to track sequential updates. It is only incremented when corresponding update is recorded to durable storage and no missed updates are exist with lesser counter value.

For example, LWM value=5 means what all updates with assigned counters 1, 2, 3, 4, 5 were applied to WAL.

High watermark or reservation counter is increment for each pending update, which even not guaranteed to succeed. Reservation counter is assigned during tx prepare phase.

Given all said, there is an important invariant: HWM >= LWM

Out of order update queue is used, literally, for recording updates applied out of order. This is possible due to a fact what updates this higher reservation counter could be applied before updates with lesser reservation counter, causing gaps in update sequence.

Consider the following example.

Suppose we have 3 server nodes and 1 client node in topology.

Two threads on client start two transaction updating keys from the single partition.

Tx1 enlists and updates keys k1, k2, k3, Tx2 enlists and updates keys k4, k5:

Ignite client = ...

IgniteCache cache = client.cache(name);

Thread 1:

try(Transaction tx1 = client.transactions().txStart()) {
      cache.put (k1, v1);

      cache.put (k2, v2);

      cache.put (k3, v3);
      tx1.commit();

}

Thread 2:

try(Transaction tx2 = client.transactions().txStart()) {
      cache.put (k4, v4);

      cache.put (k5, v5);

      tx2.commit();

}

Lets assume all locks are aquired and transactions are started to prepare almost simultaneously.

Consider an example of counter status:

lwm = 5, ooo = (6.1) (8.2), hwm = 11

Lwm - low water mark - number of the last applied sequential update.
That is, in this partition all updates from 1 to 5 are present.
Partial - applied out-of-order updates. In the example in the partition there are updates 7,9,10
But skipped updates 6, 8, 11
Hwm = 11 indicates the number of updates made to the partition, some of which are still “in flight”.
highestApplied = 9 the deduced property - an update with the maximum serial number

Hwm is wound only on the primary node to ensure the monotonicity of the counter lwm <= hwm.

Due to the fact that it is possible to reorder transactions into one partition, updates can be applied in any order, which is why omissions are possible in the sequence of updates. To do this, a special structure has been introduced into the counter for tracking updates applied in random order.

Update counters are used to determine the state when the backup is behind the primaries. A lag can occur, for example, if the node went out under load and while there were no updates to one of the partitions for which it is the owner. If for each partition the number of updates is the same, then the partitions are identical.
If on one of the partitions the counter lags by k, then it means you need to use k lagging updates to synchronize the partitions. Only relevant for persistent mode.
For in-memory, the partition is always restored from scratch.
Thus, it is extremely important to correctly track update counters, since it depends on them whether a rebalance will be launched to restore the integrity of partitions [2].

Example
Consider how integrity is achieved using the transactional key insertion operation as an example. The topology has 4 nodes, 3 server and one client.

tx.start
cache.put (k, v) // lock
tx.commit // prepare, commit

The transaction starts on some node of the grid, it can be a client node, where there is now a version of the topology N and the partition distribution by nodes is calculated for it.
The Apache Igntie project uses the following terminology for tx originating node: near node.
Ignite uses a 2 phase locking protocol to provide transactional guarantees.

The first step (near map) on the near node is to calculate the map of this transaction - a set of involved primary nodes. Since one key is involved in the transaction, there will be only one node in the card. For this partition on this version of the topology, a Lock request is sent that executes the lock (k) operation, the semantics of which are similar to acquiring exclusuve key locks.

It may happen that on the primary node, at the time of receipt of the request processing, the distribution on the version of the topology topology version N + 1 is already relevant . In this caseor in the process of changing the version.

If the next version is not compatible with the previous version (having the same partition assignment), a response is sent to the near node indicating that a more current recent version of the topology is usedavailable and the transaction should retry on the next version. This process is called remap.

If the lock is successfully acquired, the prepare phase begins. At the primary node, there is a “reservation” of the update counter for the partition for the locked key — an increase in the hwm field.

There is no guarantee that this update will be completed, for example, if during the dht prepare transaction some nodes will exit, the transaction may be rolled back.

. This also globally locks a topology version. A new topology cannot be set until all transactions that started on the current topology have finished. This is handled by the PME protocol.

Each locked key is assigned the update sequence number generated by using the next value of HWM counter field. If multiple keys are locked in the same partition, a range will be reserved.

For each primary partition, a DHT map is calculated. A DHT map is a mapping of partitions to backup nodes. In the example, each transaction will have two nodes in the DHT map: s2 and s3.

When the DHT map is ready, the primary node sends prepare requests containing enlisted keys and their update counters to the corresponding backup nodes.

Thus, update counters In this phase, the primary node builds a backup map (dht map) and sends prepare requests to the backup nodes, if any. These requests contain information about the update number for this key. Thus, update numbers are synchronized between primaries and backups - assigned to primaries, then transferred to backups.

If all the backup nodes answered positively, then the transaction goes into the PREPARED state. In this state, it is ready for commit (COMMIT). During commit, all transactional changes are applied to the WAL and the counter of applied updates is increased. At this point, lwm catches up with hwm.

Important: correctly update the counter of applied updates only if the update is in the WAL. Otherwise, a situation is possible when the counter has increased, but there is no corresponding update.

TODO counter flow pic (fig.1).

Let us consider in more detail how counters are used to restore consistency (rebalance [2]).

When a node enters, it sends for each partition during PME lwm, as well as the depth of the history that it has on it to determine the possibility of historical rebalance. History is determined by a pair of counters (earlies_checkpoint_counter, lwm). The counter guarantees that there are no missed updates inside this story.
At the same time, the node may have missed updates with a higher number.
On the coordinator, lwm is compared and the node (or nodes) with the highest lwm value is selected, and this information is sent to the incoming node, which starts the rebalance procedure if it remains. The rebalance will be either complete with a preliminary clearing of the partition, or historical, if any node contains the necessary update history in the WAL.

TODO rebalance flow pic (fig.2).

Crash recovery
When a node’s primary collapse, a situation may arise where there may be forever gaps in the updates on backup nodes in the event of transaction reordering: transactions that started later with a higher counter value were completed before the transaction with a lower counter value was fixed and then the primary dropped, without sending the missing updates.
In this case, the pass will be closed on node left PME, and similarly closed in WAL using RollbackRecord if persistence is enabled

In the case of tx recovery, if it is decided to roll back the transaction, a situation is possible when one of the dht nodes did not receive information about the start of the transaction, while the others received it.
This will result in a counter out of sync. To prevent dht problems, the nodes will exchange information about the counters in the neightborcast request with each other, and thus the counters in the partitions on all the host nodes will be aligned.

If the primary node fell out, then one of the backups becomes the new node, and due to the PME protocol atomic switching to the new primary takes place, where hwm becomes equal to the last known lwm for partition (see Figure 2).

Switching primaries
It is important to ensure the atomicity of switching the node’s primaries for partitioning in node entry or exit scenarios, otherwise the HWM >= LWM invariant will break, which ensures that if at least one stable copy is available, everyone else can restore its integrity from it. This is achieved due to the fact that transactions on the previous topology are completed before switching (as a phase of the PME protocol) to a new one and the situation when two primaries of a node simultaneously exist in a grid for one partition is impossible.

, then transferred to backups.

If all backup nodes return a positive response, then the transaction goes into the PREPARED state. In this state, the transaction is ready for commit.

During commit, all transactional changes are applied to the WAL first, and LWM is increased next. At this point, LWM catches up HWM.

Because transaction commits can reorder, it's possible to have some updates applied out of order, as shown in the example above. The out of order updates sequence is used for tracking them.

Let's imaging that out of order updates are not tracked and LWM doesn't exist. In this case, if the primary node fails before applying the update from Tx1, we will end up with update counter=5 and will lose missed updates after the node rejoins to the topology because rebalancing will not start due to equal counters.

Restoring consistency

The LWM value is used to calculate missing data when an outdated node joins the topology. In this scenario, counter based rebalancing is used to achieve consistency.

The joining node sends LWM for each partition as part of the partition map exchange protocol. The coordinator collects all LWMs and determines nodes with highest LWM for each partition. These nodes are elected as data suppliers for a given partition.

If the coordinator detects a lagging node, it sends a partition map with the partition state changed to MOVING, triggering a rebalance on the joining node. The missing number of updates for each partition is calculated as the difference between the highest known LWM and LWM from the joining node.

Figure 2 illustrates the simplified rebalancing flow of a node join with an outdated partition id=0.

Image Added

Update counters are used to determine the state when one partition copy is behind others.

It is safe to compare LWMs because all updates with counters below LWM are guaranteed to exist.

Thus, it is very important to correctly track LWMs since their values are used to calculate when rebalancing is necessary to restore the integrity of partitions [2].

As already mentioned, it is safe to increment LWM only if the update is written to the WAL. Otherwise, a situation is possible where the counter has been increased but there is no corresponding durable update.

Crash recovery

There are no guarantees that updates with already assigned counters will ever be applied.For example, if during the prepare phase some mapped nodes exit, then the transaction will be rolled back; or it is also possible for a node to crash during commit.

This causes gaps in the update sequence. Several additional mechanisms are introduced to close such gaps.

Consider figure 3:

Image Added

In the scenario on figure 3, Tx2 is committed before Tx1 had the chance to prepare, and the primary node fails.

As a part of the PME protocol, when a node leaves, all existing gaps are removed after the completion of all active transactions. This happens in org.apache.ignite.internal.processors.cache.PartitionUpdateCounter#finalizeUpdateCounters.

Consider figure 4:

Image Added

In the scenario in figure 4, the transaction is prepared only on one backup and the primary node fails. A special partition counters update request is used to sync counters with remaining backup node.

Importance of atomic primaries switching

As shown in figure 4, when the primary node fails, then one of the backups must become the new primary according to the affinity assignment for the next topology. This happens during the partition map exchange.

During PME on counter exchange phase, a new primary adjusts its HWM to the max LWM value for each cache partition. This guarantees monotonous growth of HWM in the grid and valid generation of counters by new primaries.

It is important to ensure the atomicity when switching the primary node for partition on all grid nodes.  Otherwise, the HWM >= LWM invariant will break, which ensures that if at least one stable copy is available, everyone else can restore its integrity from it.

Atomic switching is one of the PME protocol guaranties, making it impossible for two different grid nodes to use different primary node for mapping.

The same issue exists when a node joinsIn the event of a node crash under load after restart, local recovery from WAL is started. At the same time, part of the updates could be skipped. At the start, during WAL recovery, the correct lwm will be determined due to the fact that we store gaps in the counter and is used for rebalance. Rebalance will close all gaps, and when you enter the node, you can clear them.