The document describes principles used for achieving consistency between data copies for transactional caches with enabled persistence. The information is relevant for AI 2.8 release or higher.
Cluster topology
Cluster topology is an ordered set of client and server nodes at a certain point in time. Each stable topolog is assigned a topology version number - monotonically increasing pair of counters (majorVer, minorVer). Major version is used for tracking grid node events such join, left or fail, minor version is used for tracking internal events such cache start or activation. Each node maintains a list of nodes in the topology, and it is the same on all nodes for a given version. It is important to note that each node receives information about topology changes at different times, meaning that at any point in time, nodes can see different topology versions, but eventually they will all see the same.
...
Technically rebalancing is the exchange of supply and demand messages between nodes. A demand message is sent from the node where partition has stale data to the node that has the actual data. A supply message containing data is sent in response to the demand message. After processing the supply message, the next demand message is sent, and so on, until there is nothing more to rebalance. We will look at this later in more detail.
Update counter structure
Currently, different implementations of update counter logic exist for persistent mode and for in-memory mode. In this article we will only look at former. The main difference with latter is ability to track updates that are applied out of order.
Each update to a partition receives a sequential number for ensuring data consistency between copies. A special structure called partition update counter is used to assign and increment update counters.
A partition update counter has the following structure:
Code Block | ||
---|---|---|
| ||
/** Low watermark. */ |
...
private final AtomicLong cntr = new AtomicLong(); |
Low watermark or update counter is used to track sequential updates. It is only incremented when the corresponding update is recorded to a durable storage, and no missed updates exist with lesser counter value.
For example, LWM value=5 means what all updates with assigned counters 1, 2, 3, 4, 5 were applied to WAL.
Code Block | ||
---|---|---|
| ||
/** High watermark. */ |
...
private final AtomicLong reserveCntr = new AtomicLong(); |
High
...
watermark
...
or
...
reservation
...
counter
...
is
...
incremented
...
for
...
each
...
pending
...
update,
...
which
...
even
...
not
...
guaranteed
...
to
...
succeed.
...
Reservation
...
counter
...
is
...
assigned
...
during
...
tx
...
prepare
...
phase./** Updates applied out of order. */
private SortedSet<Range> seq = new
Code Block | ||
---|---|---|
| ||
/** Updates applied out of order. */ private SortedSet<Range> seq = new TreeSet<>(); |
This field is used for recording the updates that are applied out of order. This is possible because updates with higher counter could be applied to WAL before updates with lower counter, causing gaps in the update sequence.
Code Block | ||
---|---|---|
| ||
/** |
...
* Update counter task. Update from start value by delta value. |
...
*/ |
...
private static class Range implements Comparable<Range> { |
...
...
/** */ |
...
...
private long start; |
...
...
/** */ |
...
...
...
private long delta; |
...
/** {@inheritDoc} */ |
...
...
@Override public int compareTo(@NotNull Range r) { |
...
...
return Long.compare(this.start, r.start); |
...
...
} |
...
} |
A range represents a sequence of updates, for example (5, 3) means three updates with number 6, 7, 8. We will use this notation again later.
Out of order updates range is held in the sequence only if an update with lower range is missing. For example, LWM=5, HWM=9, seq=(6,2) means updates 7, 8 were applied out of order and updates 6, 9 are still not applied.
Note that there is an important invariant which can be derived from the statements above:
HWM >= LWM
Update counter flow
The flow is closely bound to transactions protocol. Counters are modified during stages of two-phase commit (2PC) protocol used by Ignite to ensure distributed consistency.
...
F(k1) = F(k2) = F(k3) = F(k4) = F(k5) = s1(Primary), s2(Backup1), s3(Backup2)
Code Block | ||
---|---|---|
| ||
Ignite client = startClient(); |
...
IgniteCache<Integer, Integer> cache = client.cache(name); |
Thread 1:
Code Block | ||
---|---|---|
| ||
try(Transaction tx1 = client.transactions().txStart()) { |
...
cache.putAll(Map.of(k1, v1, k2, v2, k3, v3)); |
...
tx1.commit(); |
...
} |
Thread 2:
Code Block | ||
---|---|---|
| ||
try(Transaction tx2 = client.transactions().txStart()) { |
...
cache.putAll(Map.of(k4, v4, k5, v4); |
...
tx2.commit(); |
...
} |
Figure1 below shows the flow with the case of backup reordering, where tx with higher update counters is applied before the transaction with lower update counters on backup.
...