Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Minor clarifications

...

On startup, the state is empty

StateChangelog.sqn = T0

StateBackend.lastMaterializedSqn = T0

list.add(a)

During normal processing, operator updates its state and simultaneously adds state changes to the StateChangelog. Logical time is not updated.

CP1 triggered

StateBackend calls StateChangelog.persisDurably(after = lastMaterializedSqn = T0).

StateChangelog.sqn = T1 (as a result of the call above)

StateChangelog returns a Future representing the completion of write of (T0:T1 = a) and remembers it internally.

list.add(b)

State materialization is triggered

StateBackend obtains the sqn from StateChangelog (T2)

StateChangelog.sqn = T2 (as a result of the call above)

StateBackend flushes RocksDB memtables to SST files; starts the async phase passing it obtained sqn=T2 and snapshot (~list of files).

Materialized state will be (T0:T2 = a,b)

list.add(c)

CP2 triggered

StateBackend calls StateChangelog.persisDurablypersistDurably(after = lastMaterializedSqn = T0).

StateChangelog.sqn = T3 (as a result of the call above)

StateChangelog finds the previous Future (T0:T1 = a) and combines it with a new one (T1:T3 = b,c).

CP1-Future completes

It contains a shared StateHandle pointing to a file on S3.

This handle is included in TaskSnapshot and sent to JM.

CP2-Future completes

It contains a combined StateHandle pointing to two files on S3. 

StateHandle is included in TaskSnapshot and is sent to JM.

JM finalizes CP1, and then CP2

CP1 is subsumed but nothing is removed yet.

State materialization completes

StateBackend stores the consolidated state handle (T0:T2 = a,b)

StateBackend.lastMaterializedSqn = T2

StateChangelog.truncate(before = T2) // remove (T0:T1 = a) object reference (not any state that is already saved or is being saved)

list.add(d)

CP3 is triggered

StateBackend calls StateChangelog.persisDurably(after = lastMaterializedSqn = T2).

StateChangelog.sqn = T4 (as a result of the call above)

StateChangelog searches for reference matching the requested (T2:*) range. It finds completed (T1:T3 = b,c). To be able to filter out (T1=b) on recovery, the returned handle specifies the range (T2:T4). Files also specify the range for each changeset internally.

Found Future is combined with a new one for (T3:T4 = d).

CP3-Future completes

StateBackend combines its result with the consolidated handle and sends to JM:

  • (T0:T1 = a,b) - consolidated changes
  • (T1:T3 = b,c) - previously saved changelog (“b” filtered on recovery)
  • (T3:T4 = d) - the newly saved changelog

JM finalizes CP3

CP2 is subsumed, the state handle pointing to the file with (T0:T1 = a) is not referenced anymore and so removed.

...