...
On startup, the state is empty StateChangelog.sqn = T0 StateBackend.lastMaterializedSqn = T0 |
list.add(a) During normal processing, operator updates its state and simultaneously adds state changes to the StateChangelog. Logical time is not updated. |
CP1 triggered StateBackend calls StateChangelog.persisDurably(after = lastMaterializedSqn = T0). StateChangelog.sqn = T1 (as a result of the call above) StateChangelog returns a Future representing the completion of write of (T0:T1 = a) and remembers it internally. |
list.add(b) |
State materialization is triggered StateBackend obtains the sqn from StateChangelog (T2) StateChangelog.sqn = T2 (as a result of the call above) StateBackend flushes RocksDB memtables to SST files; starts the async phase passing it obtained sqn=T2 and snapshot (~list of files). Materialized state will be (T0:T2 = a,b) |
list.add(c) |
CP2 triggered StateBackend calls StateChangelog.persisDurablypersistDurably(after = lastMaterializedSqn = T0). StateChangelog.sqn = T3 (as a result of the call above) StateChangelog finds the previous Future (T0:T1 = a) and combines it with a new one (T1:T3 = b,c). |
CP1-Future completes It contains a shared StateHandle pointing to a file on S3. This handle is included in TaskSnapshot and sent to JM. |
CP2-Future completes It contains a combined StateHandle pointing to two files on S3. StateHandle is included in TaskSnapshot and is sent to JM. |
JM finalizes CP1, and then CP2 CP1 is subsumed but nothing is removed yet. |
State materialization completes StateBackend stores the consolidated state handle (T0:T2 = a,b) StateBackend.lastMaterializedSqn = T2 StateChangelog.truncate(before = T2) // remove (T0:T1 = a) object reference (not any state that is already saved or is being saved) |
list.add(d) |
CP3 is triggered StateBackend calls StateChangelog.persisDurably(after = lastMaterializedSqn = T2). StateChangelog.sqn = T4 (as a result of the call above) StateChangelog searches for reference matching the requested (T2:*) range. It finds completed (T1:T3 = b,c). To be able to filter out (T1=b) on recovery, the returned handle specifies the range (T2:T4). Files also specify the range for each changeset internally. Found Future is combined with a new one for (T3:T4 = d). |
CP3-Future completes StateBackend combines its result with the consolidated handle and sends to JM:
|
JM finalizes CP3 CP2 is subsumed, the state handle pointing to the file with (T0:T1 = a) is not referenced anymore and so removed. |
...