Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

[1] Affinity based rebalancing
In the first case, the reason for the rebalancing is a change in the list of nodes - owners of the partition. If a new node has joined the topology then part of the partitions will be reassigned to it due to the affinity function property of the uniform distribution of partitions among the nodes. If the node is left then the re-balance rebalancing will be triggered to ensure the availability of the required number of copies of the partition in the grid.

[2] Updates counter based rebalancing.
In the second case, which is relevant only for the persistence mode, re-balance rebalancing is used to restore the consistency of the data if the node was absent for some time in the topology under the load.
When the node re-joins the topology, it will have stale data, and the partition update counter will be analyzed to determine how much the partitions have “lagged” from other copies.
Depending on how strong the partitions are lagging behind, the second type of rebalance can use the write-ahead log as a data source for updates history. In this case, the rebalancing can be performed without complete reloading of the partition and is called “historical”.
Another scenario for the occurrence of [2] is the activation of the grid , where some nodes have having nodes in topology with stale data.
Rebalancing is the exchange of supply and demand messages between nodes. A demand message is sent from node where partition has stale data to node with actual data. Supply messages is send in response to demand message, after processing a supply message next demand message is send and so on until nothing more to rebalance.

Update counter structure

Currently three different implementations of update counter exist: for persistent mode, for in-memory mode and for MVCC mode.

In this article we only look at former.

Each update to a partition receives a sequential number for a purpose of ensuring data consistency between copies. A special structure called partition update counter is used to assign and increment update counters.

...

/** Low watermark. */
private final AtomicLong cntr = new AtomicLong();
/** High watermark. */protected final AtomicLong reserveCntr = new AtomicLong();

/** Queue of applied out of order counter updates. */
private SortedSet<Range> oomQueue = new TreeSet<>();

Low watermark, or update counter is used to track sequential updates. It is only incremented when corresponding update is recorded to durable storage and no missed updates are exist with lesser counter value.

For example, LWM value=5 means what all updates with assigned counters 1, 2, 3, 4, 5 were applied to WAL.


/** High watermark. */
protected final AtomicLong reserveCntr = new AtomicLong();
High watermark or reservation counter is increment for each pending update, which even not guaranteed to succeed. Reservation counter is assigned during tx prepare phase.

Range

/** Updates applied out of order. */
private SortedSet<Range> seq = new TreeSet<>();

This field is used Out of order update queue is used, literally, for recording updates applied out of order. This is possible due to a fact what updates this higher reservation counter could be applied to WAL before updates with lesser lower reservation counter , causing gaps in update sequence.

/**
* Update counter task. Update from start value by delta value.
*/
private static class Range implements Comparable<Range> {
/** */
private long start;

/** */
private long delta;

/** {@inheritDoc} */
@Override public int compareTo(@NotNull Range r) {
return Long.compare(this.start, r.start);
}
}

A range represents a sequence of updates, for example (5, 3) means three updates with number 6, 7, 8.

Where is an important invariant which can be derived from statements above:

HWM >= LWM.

Update counter flow

Flow The flow is closely bound to transactions protocol. Counters are modified during stages of 2PC protocol used by Ignite to  to ensure distributed consistency.

...

We will use an example to understand counters the flow.

Suppose we have 3 server nodes and 1 client node in topology and topology version is N.

Two threads on single client start two transaction pessimistic (TODO javadoc ref) transactions concurrently updating keys belonging to the single partition p. Backups count is equal to 2 so total number of onwers is 3.

Tx1 enlists and updates keys k1, k2, k3, Tx2 enlists and updates keys k4, k5.

FP(k1) =  FP(k2) =  FP(k3) =  FP(k4) =  FP(k5) = p.

F(k1) = F(k2) = F(k3) = F(k4) = F(k5) = s1, s2, s3

TODO format as code

Ignite client = startClient();

...

try(Transaction tx2 = client.transactions().txStart()) {
      cache.putAll(Map.of(k4, v4, k5, v4);

      tx2.commit();

}

The figure below shows the flow with the case of backup reordering, where tx with higher update counters is applied before tx with lower update counters on backup.

TODO fig

The first step (near map) on the near(originating) node is to calculate the near map of this transaction - for each partition p a set of involved primary nodes using the affinity function F(p) . In our case same and only one primary node for both transactions.

For this partition on this topology version a lock request is sent that executes the lock (k) operation, the semantics of which is similar to acquiring exclusuve lock on given key (footnote: this is true for pessimistic transactions).

...

First, each enlisted key is assigned the update sequence number,  generated by next value of HWM counter field. If multiple keys are prepared the transaction reserves a range.

Then primary node builds a backups map (dht map) and sends prepare requests to the backup nodes, if any, containing enlisted keys and their update numbers.

...

If all backup nodes answered positively, then the transaction goes into the PREPARED state. In this state, it is ready for commit.

During commit , all transactional changes are applied to the WAL first and ONLY AFTER WHAT the LWM is increased next. At this point , LWM catches up with HWM.

Because transaction commits can reorder it's possible to have some updates applied out of order. The field oomQueue is used for tracking such updates.The figure 3 shows transaction map for tx1:

TODO figure 3

Lets return to our example and assume all locks are aquired and transactions are started to prepare almost simultaneously.

The diagram shows counter state updates for primary and backup during tx processing by two transactions with reordered prepare/commit.

TODO pic

HWM is increment during tx prepare phase on the primary node to ensure the monotonicity of the counter and invariant HWM >= LWM holding.

...

The simplified rebalancing flow is illustratred on fugure 3.:

TODO fig. 3

Update counters are used to determine the state when the one partition copy is behind the other(s).

...