You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 18 Next »

The document describes principles used for achieving consistency between data copies.

Cluster topology

Cluster topology is a ordered set of client and server nodes at a certain point in time. Each stable topology is assigned a topology version number - monotonically increasing pair of counters (majorVer, minorVer). Each node maintains a list of nodes in the topology and it is the same on all nodes for given version. It is important to note that each node receives information about the topology change eventulally, that is, independently of the other node at the different time.

Affinity function

A set of all possible cache keys is divided between partitions. A partition is an indivisible data set in a grid, a subset of a total set of keys. The key uniquely determines the partition in which it will be stored.

Every change in topology causes new topology version. For each new version of the topology it is necessary to calculate how the data is distributed among the nodes in topology. Those nodes that are responsible for storing data are called affinity nodes. Each node owns a subset of partitions. If a new affinity node has joined then it is necessary to rebalance part of the data to the new node. 

The distribution (sharding) of data on the nodes of the grid is defined by affinity function

First, affinity function uniquely defines a partition for key: P(key) = partition 

This is a static property and cannot change in time.

Second, affinity function defines which partitions belongs to the node: F(topology, part) = list(nodes)

At different points in time a partition can belong to different nodes. This is a dynamic property of a partition.

The number of nodes in this list is equal to the number of copies of the partition. Recall that one of the fundamental properties of a grid is its high availability. With the loss of a certain number of nodes, complete data loss does not occur and the load is not interrupted. The need to store multiple copies is due to the implementation of this property.

The first node in the list is the so-called primary node, has a special coordinating role for all operations on the partition. The remaining are backup nodes. In the event of a loss of a primary node, one of the backup nodes becomes the new primary.

Todo PIC for some AF calculated state.

An important property of affinity function is the uniform distribution of partitions over nodes.

The default implementation uses the rendezvous hashing algorithm https://en.wikipedia.org/wiki/Rendezvous_hashing

Due to implementation of these properties linear scalability is achieved when adding new nodes to the grid.

Rebalancing

Rebalancing - a procedure for exchanging data between nodes to ensure uniform distribution of data (redistribution of data between nodes) [1] and to ensure data consistency [2].

[1] Partition state based rebalancing
In the first case, the reason for the rebalancing is a change in the list of nodes - owners of the partition. If a new node has joined the topology then part of the partitions will be reassigned to it due to the affinity function property of the uniform distribution of partitions among the nodes. If the node is left then the re-balance will be triggered to ensure the availability of the required number of copies of the partition in the grid.

[2] Counter based rebalancing.
In the second case, which is relevant only for the persistence mode, re-balance is used to restore the consistency of the data if the node was absent for some time in the topology under the load.
When the node re-joins the topology, the partition update counter will be analyzed to determine how much the partition has “lagged” from other copies.
Depending on how much the partition is lagging behind, the second type of rebalance can use the write-ahead log as a data source. In this case, the rebalancing can be performed without a complete reloading of the partition and is called “historical”.
Another scenario for the occurrence of [2] is the activation of the grid, where some nodes may have stale data.
Re-balance is the exchange of supply and demand messages between nodes. A demand message is sent from node where partition has stale data to node with actual data. Supply messages is send in response to demand message, after processing a supply message next demand message is send and so on until nothing more to supply.

Update counter structure and flow

Each update to a partition receives a sequential number for a purpose of ensuring data consistency between copies. A special structure partition update counter is used to assign and increment update counters.


Partition update counter has the following structure:

/** Low watermark. */
private final AtomicLong cntr = new AtomicLong();

/** High watermark. */
protected final AtomicLong reserveCntr = new AtomicLong();

/** Queue of applied out of order counter updates. */
private TreeSet<Item> queue = new TreeSet<>();

Low watermark, or update counter is used to track sequential updates. It is only incremented when corresponding update is recorded to durable storage and no missed updates are exist with lesser counter value.

For example, LWM value=5 means what all updates with assigned counters 1, 2, 3, 4, 5 were applied to WAL.

High watermark or reservation counter is increment for each pending update, which even not guaranteed to succeed. Reservation counter is assigned during tx prepare phase.

Given all said, there is an important invariant: HWM >= LWM

Out of order update queue is used, literally, for recording updates applied out of order. This is possible due to a fact what updates this higher reservation counter could be applied before updates with lesser reservation counter, causing gaps in update sequence.

Consider the following example.

Suppose we have 3 server nodes and 1 client node in topology.

Two threads on client start two transaction updating keys from the single partition.

Tx1 enlists and updates keys k1, k2, k3, Tx2 enlists and updates keys k4, k5:

Ignite client = ...

IgniteCache cache = client.cache(name);

Thread 1:

try(Transaction tx1 = client.transactions().txStart()) {
      cache.put (k1, v1);

      cache.put (k2, v2);

      cache.put (k3, v3);
      tx1.commit();

}

Thread 2:

try(Transaction tx2 = client.transactions().txStart()) {
      cache.put (k4, v4);

      cache.put (k5, v5);

      tx2.commit();

}

A few lines about how transactions work.

Txs use 2PC protocol.

The first step (near map) on the near node is to calculate the map of this transaction - a set of involved primary nodes. Since one key is involved in the transaction, there will be only one node in the card. For this partition on this version of the topology, a Lock request is sent that executes the lock (k) operation, the semantics of which are similar to acquiring exclusuve key locks.

It may happen that on the primary node at the time of receipt of the request, the distribution on the version of the topology N + 1 is already relevant. In this case, a response is sent to the near node indicating that a more current version of the topology is used. This process is called remap.

If the lock is successfully acquired, the prepare phase begins. At the primary node, there is a “reservation” of the update counter for the partition for the locked key — an increase in the hwm field.

In this phase, the primary node builds a backup map (dht map) and sends prepare requests to the backup nodes, if any. These requests contain information about the update number for this key. Thus, update numbers are synchronized between primaries and backups - assigned to primaries, then transferred to backups.

If all the backup nodes answered positively, then the transaction goes into the PREPARED state. In this state, it is ready for commit (COMMIT). During commit, all transactional changes are applied to the WAL and the counter of applied updates is increased. At this point, lwm catches up with hwm.


Lets assume all locks are aquired and transactions are started to prepare almost simultaneously.

The diagram shows counter state updates for primary and backup during tx processing by two transactions with reordered prepare/commit.

TODO pic

HWM is increment during tx prepare phase on the primary node to ensure the monotonicity of the counter and invariant HWM >= LWM holding.

Update counters are used to determine the state when the one partition copy is behind the other(s).

It is only safe to compare LWMs because of guarantee of presence of sequential updates up to LWM.

Lets assume partition N on node1 have LWM=m and partition N on node2 have LWM=k, m > k. This means partition on node2 is lagging behind node1 and number of missing updates is m-k.

Thus, it is very important to correctly track update counters, since thier values are used to calculate when rebalancing is necessary to launch to restore the integrity of partitions [2].

One more imporant overvation: LWM could safely be incremented if the update is written to the WAL. Otherwise, a situation is possible when the counter has increased, but there is no corresponding update.


Rebalancing

Let us consider in more detail how counters are used to restore consistency (rebalance [2]).

When a node enters, it sends for each partition during PME lwm, as well as the depth of the history that it has on it to determine the possibility of historical rebalance. History is determined by a pair of counters (earlies_checkpoint_counter, lwm). The counter guarantees that there are no missed updates inside this story.
At the same time, the node may have missed updates with a higher number.
On the coordinator, lwm is compared and the node (or nodes) with the highest lwm value is selected, and this information is sent to the incoming node, which starts the rebalance procedure if it remains. The rebalance will be either complete with a preliminary clearing of the partition, or historical, if any node contains the necessary update history in the WAL.

TODO rebalance flow pic (fig.2).

Crash recovery

There is no guarantee that this update will be completed, for example, if during the dht prepare transaction some nodes will exit, the transaction may be rolled back.
When a node’s primary collapse under load situations may arise where may be permanent gaps in the updates on backup nodes in the event of transaction reordering.

1.

Transactions that started later with a higher counter value were prepared before the transaction with a lower counter value were able to be prepared on at least one backup.
In this case, the gaps will be closed on node left PME, and similarly closed in WAL using RollbackRecord if persistence is enabled.

This scenario is handled by org.apache.ignite.internal.processors.cache.PartitionUpdateCounter#finalizeUpdateCounters

2.

Transactions that started later with a higher counter value were prepared before the transaction with a lower counter value were able to be prepared only on some backups, but not all. This means some of backups never seen a counter update.

In this case, the gaps will be closed by special king of message PartitionUpdateCountersRequest on tx rollback, and similarly closed in WAL using RollbackRecord if persistence is enabled.

TODO fig

Switching primaries

If the primary node fell out, then one of the backups becomes the new node, and due to the PME protocol atomic switching to the new primary takes place, where hwm becomes equal to the last known lwm for partition (see Figure 2).
It is important to ensure the atomicity of switching the node’s primaries for partitioning in node entry or exit scenarios, otherwise the HWM >= LWM invariant will break, which ensures that if at least one stable copy is available, everyone else can restore its integrity from it. This is achieved due to the fact that transactions on the previous topology are completed before switching (as a phase of the PME protocol) to a new one and the situation when two primaries of a node simultaneously exist in a grid for one partition is impossible.

In the event of a node crash under load after restart, local recovery from WAL is started. At the same time, part of the updates could be skipped. At the start, during WAL recovery, the correct lwm will be determined due to the fact that we store gaps in the counter and is used for rebalance. Rebalance will close all gaps, and when you enter the node, you can clear them.



  • No labels