You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 34 Next »

The document describes principles used for achieving consistency between data copies.

First we will understand some basic concepts, when move to more complex topics.

Cluster topology

Cluster topology is a ordered set of client and server nodes at a certain point in time. Each stable topology is assigned a topology version number - monotonically increasing pair of counters (majorVer, minorVer). Each node maintains a list of nodes in the topology and it is the same on all nodes for given version. It is important to note that each node receives information about the topology change eventulally, that is, independently of the other node at the different time.

Affinity function

A set of all possible cache keys is divided between partitions. A partition is an indivisible data set in a grid, a subset of a total set of keys. The key uniquely determines the partition in which it will be stored.

Every change in topology causes new topology version. For each new version of the topology it is necessary to calculate how the data is distributed among the nodes in topology. Those nodes that are responsible for storing data are called affinity nodes. Each node owns a subset of partitions. If a new affinity node has joined then it is necessary to rebalance part of the data to the new node. 

The distribution (sharding) of data on the nodes of the grid is defined by affinity function

First, affinity function uniquely defines a partition for key: P(key) = partition.

This is a static property and cannot change in time.

The default implementation of function P is modulo division: abs(key.hashCode) % partitionsCount.

Second, affinity function defines which partitions belongs to the node: F(part) = list(nodes)

At different points in time a partition can belong to different nodes. This is a dynamic property of a partition.

The number of nodes in this list is equal to the number of copies of the partition. Recall that one of the fundamental properties of a grid is its high availability. With the loss of a certain number of nodes, complete data loss does not occur and the load is not interrupted. The need to store multiple copies is due to the implementation of this property.

The first node in the list is the so-called primary node, has a special coordinating role for all operations on the partition. The remaining are backup nodes. In the event of a loss of a primary node, one of the backup nodes becomes the new primary.

Another important property of affinity function is the uniform distribution of partitions over nodes.

Look at possible partition distribution between nodes for topology with 3 server nodes having 6 partitions.

Function F could looks someting like (in ideal world):

F(0) = s1, s2, s3

F(1) = s2, s3, s1

F(2) = s3, s1, s2

F(3) = s1, s2, s3

F(4) = s2, s3, s1

F(5) = s3, s1, s2

First node in the list corresponds to primary node, others are backups. Only list prefix equal in size to owners count is used, other nodes are ignored.

In real life results might not be so ideal because of hash randomization causing some nodes to have slightly more partitions then others. It depends on total partitions counter - the more total partitions count the more uniform is assignment.

The default implementation for function F uses the rendezvous hashing algorithm https://en.wikipedia.org/wiki/Rendezvous_hashing

Due to implementation of these properties of affinity function linear scalability is achieved when adding new nodes to the grid.

Rebalancing

Rebalancing - a procedure for exchanging data between nodes to ensure uniform distribution of data (redistribution of data between nodes) according to affinity assignment [1] and to ensure data consistency if copies diverge [2].

[1] Partition state based rebalancing
In the first case, the reason for the rebalancing is a change in the list of nodes - owners of the partition. If a new node has joined the topology then part of the partitions will be reassigned to it due to the affinity function property of the uniform distribution of partitions among the nodes. If the node is left then the re-balance will be triggered to ensure the availability of the required number of copies of the partition in the grid.

[2] Counter based rebalancing.
In the second case, which is relevant only for the persistence mode, re-balance is used to restore the consistency of the data if the node was absent for some time in the topology under the load.
When the node re-joins the topology, the partition update counter will be analyzed to determine how much the partition has “lagged” from other copies.
Depending on how much the partition is lagging behind, the second type of rebalance can use the write-ahead log as a data source for updates history. In this case, the rebalancing can be performed without complete reloading of the partition and is called “historical”.
Another scenario for the occurrence of [2] is the activation of the grid, where some nodes may have stale data.
Re-balance is the exchange of supply and demand messages between nodes. A demand message is sent from node where partition has stale data to node with actual data. Supply messages is send in response to demand message, after processing a supply message next demand message is send and so on until nothing more to supply.

Update counter structure and flow

Each update to a partition receives a sequential number for a purpose of ensuring data consistency between copies. A special structure called partition update counter is used to assign and increment update counters.


Partition update counter has the following structure:

/** Low watermark. */
private final AtomicLong cntr = new AtomicLong();

/** High watermark. */
protected final AtomicLong reserveCntr = new AtomicLong();

/** Queue of applied out of order counter updates. */
private TreeSet<Item> oomQueue = new TreeSet<>();

Low watermark, or update counter is used to track sequential updates. It is only incremented when corresponding update is recorded to durable storage and no missed updates are exist with lesser counter value.

For example, LWM value=5 means what all updates with assigned counters 1, 2, 3, 4, 5 were applied to WAL.

High watermark or reservation counter is increment for each pending update, which even not guaranteed to succeed. Reservation counter is assigned during tx prepare phase.

Given all said, there is an important invariant: HWM >= LWM

Out of order update queue is used, literally, for recording updates applied out of order. This is possible due to a fact what updates this higher reservation counter could be applied before updates with lesser reservation counter, causing gaps in update sequence.

To explain how this is applied to achive consistency let us consider the example.

Suppose we have 3 server nodes and 1 client node in topology and topology version is N.

Two threads on client start two transaction updating keys from the single partition.

Tx1 enlists and updates keys k1, k2, k3, Tx2 enlists and updates keys k4, k5:

Ignite client = ...

IgniteCache cache = client.cache(name);

Thread 1:

try(Transaction tx1 = client.transactions().txStart()) {
      cache.put (k1, v1);

      cache.put (k2, v2);

      cache.put (k3, v3);
      tx1.commit();

}

Thread 2:

try(Transaction tx2 = client.transactions().txStart()) {
      cache.put (k4, v4);

      cache.put (k5, v5);

      tx2.commit();

}

To deeper understand counter flow it is necessary to explain how ditributed transactions are working.

2PC

Txs use 2PC protocol to consistently update all participating nodes.

The first step (near map) on the near(originating) node is to calculate the map of this transaction - for each partition p a set of involved primary nodes using the affinity function F(p) .

For this partition on this topology version a lock request is sent that executes the lock (k) operation, the semantics of which is similar to acquiring exclusuve lock on given key (footnote: this is true for pessimistic transactions).

It may happen that on the primary node at the time of request processing the topology version N + 1 is already relevant or in the process of changing to what version.

If next version is not compatible with previous (having the same partition assignment) a response is sent to the near node indicating that a more recent version of the topology is acutal and transaction should retry on next version. This process is called remap.

If the lock is successfully acquired the prepare phase begins. 

First, each enlisted key is assigned the update sequence number,  generated by next value of HWM counter field.

Then primary node builds a backups map (dht map) and sends prepare requests to the backup nodes, if any, containing enlisted keys and their update numbers.

Thus, update numbers are synchronized between primaries and backups - assigned to primaries, then transferred to backups.

If all backup nodes answered positively, then the transaction goes into the PREPARED state. In this state, it is ready for commit.

During commit, all transactional changes are applied to the WAL and ONLY AFTER WHAT the LWM is increased. At this point, LWM catches up with HWM.

Because transaction commits can reorder it's possible to have some updates applied out of order. The field oomQueue is used for tracking such updates.

The figure 3 shows transaction map for tx1:

TODO figure 3

Lets return to our example and assume all locks are aquired and transactions are started to prepare almost simultaneously.

The diagram shows counter state updates for primary and backup during tx processing by two transactions with reordered prepare/commit.

TODO pic

HWM is increment during tx prepare phase on the primary node to ensure the monotonicity of the counter and invariant HWM >= LWM holding.

Let us consider in more detail how counters are used to restore consistency (counters rebalance [2]) when outdated node joins topology.

On joining node sends LWM for each partition as a part of partition map exchange protocol . Coordinator collects all LWMs and determines nodes with highest LWM, say maxLWM. These nodes are elected as data suppliers for a given partition.

If coordinator detects lagging node it sends a partition map with partition state changed to MOVING, triggering a rebalance on corresponding. The missing number of update on each node is between it's LWM and maxLWM

The simplified rebalancing flow is illustratred on fugure 3.

TODO fig. 3

Update counters are used to determine the state when the one partition copy is behind the other(s).

It is only safe to compare LWMs because of guarantee of presence of sequential updates up to LWM.

Thus, it is very important to correctly track LWMs since thier values are used to calculate when rebalancing is necessary to launch to restore the integrity of partitions [2].

As already was mentioned, LWM could safely be incremented only if the update is written to the WAL.

Otherwise, a situation is possible when the counter has increased, but there is no corresponding durable update.

Crash recovery

There is no guarantee that this update will be completed, for example, if during the dht prepare transaction some nodes will exit, the transaction may be rolled back.
When a primary node fails under load situations may arise where may be permanent gaps in the updates on backup nodes in the event of transaction reordering.

1.

Transactions that started later with a higher counter value were prepared before the transaction with a lower counter value were able to be prepared on at least one backup.
In this case, the gaps will be closed on node left PME, and similarly closed in WAL using RollbackRecord if persistence is enabled.

This scenario is handled by org.apache.ignite.internal.processors.cache.PartitionUpdateCounter#finalizeUpdateCounters

2.

Transactions that started later with a higher counter value were prepared before the transaction with a lower counter value were able to be prepared only on some backups, but not all. This means some of backups never seen a counter update.

In this case, the gaps will be closed by special king of message PartitionUpdateCountersRequest on tx rollback, and similarly closed in WAL using RollbackRecord if persistence is enabled.

TODO fig

Switching primaries

If the primary node fell out, then one of the backups becomes the new node, and due to the PME protocol atomic switching to the new primary takes place, where hwm becomes equal to the last known lwm for partition (see Figure 2).
It is important to ensure the atomicity of switching the node’s primaries for partitioning in node entry or exit scenarios, otherwise the HWM >= LWM invariant will break, which ensures that if at least one stable copy is available, everyone else can restore its integrity from it. This is achieved due to the fact that transactions on the previous topology are completed before switching (as a phase of the PME protocol) to a new one and the situation when two primaries of a node simultaneously exist in a grid for one partition is impossible.

In the event of a node crash under load after restart, local recovery from WAL is started. At the same time, part of the updates could be skipped. At the start, during WAL recovery, the correct lwm will be determined due to the fact that we store gaps in the counter and is used for rebalance. Rebalance will close all gaps, and when you enter the node, you can clear them.



  • No labels