Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: added code blocks

The document describes principles used for achieving consistency between data copies for transactional caches with enabled persistence. The information is relevant for AI 2.8 release or higher.

Cluster topology

Cluster topology is an ordered set of client and server nodes at a certain point in time. Each stable topology topolog is assigned a topology version number - monotonically increasing pair of counters (majorVer, minorVer). Major version is used for tracking grid node events such join, left or fail, minor version is used to track for tracking internal events such cache start or activation. Each node maintains a list of nodes in the topology, and it is the same on all nodes for a given version. Topology changes are received by nodes It is important to note that each node receives information about topology changes at different times, meaning that at any point in time, nodes can see different topology versions, but eventually they will all see the same.

Affinity function

...

Every change in the topology - adding or removing nodes -  causes creates a new topology version. For each new version of the topology,  it is necessary to calculate the distribution of data between the nodes. Nodes that are responsible for storing data are called affinity nodes. Each node owns a subset of partitions. If a new affinity node joins the cluster,  the data is rebalanced to accommodate the new node. 

...

The first node in the list corresponds to the primary node, and other nodes are backups. Only list prefix equal in size to owners count is used, other nodes are ignored. Only those nodes from that belong to the prefix list prefix with a the length of backups + 1 are used for data placement, others are ignored for partitioned cache.

...

Technically rebalancing is the exchange of supply and demand messages between nodes. A demand message is sent from the node where partition has stale data to the node with that has the actual data. A supply message containing data is sent in response to the demand message. After processing the  supply message, the next demand message is sent, and so on, until there is nothing more to rebalance. We will look at this later in more detail.

Update counter structure

Currently, different implementations of update counter logic exist for persistent mode and for in-memory mode. In this article we will only look at former. The main difference with latter is ability to track updates that are  applied out of order.

Each update to a partition receives a sequential number for ensuring data consistency between copies. A special structure called partition update counter is used to assign and increment update counters.

Partition A partition update counter has the following structure:

Code Block
languagejava
/** Low watermark. */

...


private final AtomicLong cntr = new AtomicLong();

Low watermark or update counter is used to track sequential updates. It is only incremented when the corresponding update is recorded to a durable storage, and no missed updates exist with lesser counter value.

For example, LWM value=5 means what all updates with assigned counters 1, 2, 3, 4, 5 were applied to WAL.

Code Block
languagejava
/** High watermark. */

...


private final AtomicLong reserveCntr = new AtomicLong();

High

...

watermark

...

or

...

reservation

...

counter

...

is

...

incremented

...

for

...

each

...

pending

...

update,

...

which

...

even

...

not

...

guaranteed

...

to

...

succeed.

...

Reservation

...

counter

...

is

...

assigned

...

during

...

tx

...

prepare

...

phase.

Code Block
languagejava
/** Updates applied out of order. */

...


private SortedSet<Range> seq = new TreeSet<>();

This field is used for recording the updates that are applied out of order. This is possible because updates with higher counter could be applied to WAL before updates with lower counter, causing gaps in the update sequence.


Code Block
languagejava
/**

...


* Update counter task. Update from start value by delta value.

...


*/

...


private static class Range implements Comparable<Range> {

...


  

...

  /** */

...


  

...

  private long start;

...



    /** */

...


  

...

  private long delta;

...



    

...

/** {@inheritDoc}

...

 */
    @Override public int compareTo(@NotNull Range r) {

...


        

...

return Long.compare(this.start, r.start);

...


  

...

  }

...



}


A range represents a sequence of updates, for example (5, 3) means three updates with number 6, 7, 8. We will use this notation again later.

Out of order updates range is held in the sequence only if an update with lower range is missing. For example, LWM=5, HWM=9, seq=(6,2) means updates 7, 8 were applied out of order and updates 6, 9 are still not applied.

There Note that there is an important invariant which can be derived from the statements above:

HWM >= LWM

Update counter flow

The flow is closely bound to transactions protocol. Counters are modified during stages of two-phase commit (2PC) protocol used by Ignite to ensure distributed consistency.

...

F(k1) = F(k2) = F(k3) = F(k4) = F(k5) = s1(Primary), s2(Backup1), s3(Backup2)


Code Block
languagejava
Ignite client = startClient();

...



IgniteCache<Integer, Integer> cache = client.cache(name);


Thread 1:


Code Block
languagejava
try(Transaction tx1 = client.transactions().txStart()) {

...



      cache.putAll(Map.of(k1, v1, k2, v2, k3, v3));

...



      tx1.commit();

...



}


Thread 2:


Code Block
languagejava
try(Transaction tx2 = client.transactions().txStart()) {

...



      cache.putAll(Map.of(k4, v4, k5, v4);

...



      tx2.commit();

...



}


Figure1 below shows the flow with the case of backup reordering, where tx with higher update counters is applied before the transaction with lower update counters on backup.

...

Let's imaging that out of order updates are not tracked and LWM doesn't existsexist. In this case, if the primary node fails before applying the update from Tx1, we will end up with update counter=5 and will lose missed updates after the node rejoins to the topology , because rebalancing will not start due to equal counters.

...

There are no guarantees that updates with already assigned counters will ever be applied.For example, if during the prepare phase some mapped nodes exit, then the transaction will be rolled back, ; or it is also possible for a node to crash during commit.

...

In the scenario on figure 3, Tx2 is committed before Tx1 had the chance to prepare, and the primary node fails.

As a part of the PME protocol on node left , when a node leaves, all existing gaps are removed after waiting for the completion of all active transactions completion. This  This happens in org.apache.ignite.internal.processors.cache.PartitionUpdateCounter#finalizeUpdateCounters.

...

It is important to ensure the atomicity when switching the primary node for partition on all grid nodes.  Otherwise,   otherwise the HWM >= LWM invariant will break, which ensures that if at least one stable copy is available, everyone else can restore its integrity from it.

...