...
From the experiments, the latency of writing 2.5Gb to S3 (500 streams of 5Mb objects) is:
p50 | p90 | p95 | p99 | p999 |
459ms | 740ms | 833ms | 1039ms | 3202ms |
If everything below second is considered acceptable on this scale then tail latencies become the major problem. Below are some techniques to reduce it.
...
With a simple implementation, p999 latency decreases by 47% (with smaller requests the improvement is bigger):
p50 | p90 | p95 | p99 | p999 |
495ms | 733ms | 807ms | 937ms | 1689ms |
Assuming 500 writers, these request latencies translate to checkpoints latencies of p50=1000ms and p99=1700s. For .5Gb state increment, checkpoints latencies would be p75=500ms and p99=1125ms (assuming 250 writers).
...
- Materializing state changes - Remember the current logical timestamp
- Performing a checkpoint (to provide SQN for persistDurably)
Code Block | ||||
---|---|---|---|---|
| ||||
/** Scoped to a single entity (e.g. a SubTask or OperatorCoordinator). */ interface StateChangelogClient { StateChangelogWriter createWriter(OperatorID operatorID, KeyGroupRange keyGroupRange); } /** Scoped to a single writer (e.g. state backend). */ interface StateChangelogWriter { void append(int keyGroup, byte[] value); CompletableFuture<StateChangelogHandle> persistDurably(SQN after); void truncate(SQN before); // called upon checkpoint confirmation by JM /** Close this log. No new appends will be possible. Any appended but not persisted records will be lost. */ void close(); SQN lastAppendedSqn(); } interface StateChangelogHandle extends KeyedStateHandle { /** Enumerate changes per key group. */ CloseableIterable<byte[]> getStateChanges(int keyGroup); } |
...
This is an example layout inside a single file:
Code Block | ||||
---|---|---|---|---|
| ||||
+----------------------------------------------------------------------------------------------+ | File: ABCD.bin (all subtasks of TM1 for checkpoint 2) | | +------------------------------------------------------+ +-------------------------------+ | | | Log 0 ("group-by" operator) | | Log 1 ("aggregate" operator) | | | | +--------------------------------------------------+ | | +---------------------------+ | | | | | +---------------------+ +---------------------+ | | | | +---------------------+ | | | | | | | KeyGroup 0 | | KeyGroup 1 | | | | | | KeyGroup 0 | | | | | | | | +-----------------+ | | +-----------------+ | | | | | | +-----------------+ | | | | | | | | | Changeset 1 | | | | Changeset 1 | | | | | | | | Changeset 1 | | | | | | | | | | window-contents | | | | window-contents | | | | | | | | | | | | | | | | | | timers | | | | timers | | | | | | | | | | | | | | | | | +-----------------+ | | +-----------------+ | | | | | | +-----------------+ | | | | | | | | +-----------------+ | | +-----------------+ | | | | | | +-----------------+ | | | | | | | | | Changeset 2 | | | | Changeset 2 | | | | | | | | Changeset 2 | | | | | | | | | | window-contents | | | | window-contents | | | | | | | | | | | | | | | | | | timers | | | | timers | | | | | | | | | | | | | | | | | +-----------------+ | | +-----------------+ | | | | | | +-----------------+ | | | | | | | | +-----------------+ | | +-----------------+ | | | | | | +-----------------+ | | | | | | | | | Changeset 3 | | | | Changeset 3 | | | | | | | | Changeset 3 | | | | | | | | | | window-contents | | | | window-contents | | | | | | | | | | | | | | | | | | timers | | | | timers | | | | | | | | | | | | | | | | | +-----------------+ | | +-----------------+ | | | | | | +-----------------+ | | | | | | | +---------------------+ +---------------------+ | | | | +---------------------+ | | | | | +--------------------------------------------------+ | | +---------------------------+ | | | +------------------------------------------------------+ +-------------------------------+ | +----------------------------------------------------------------------------------------------+ |
...
Let's consider a job with one stateful operator having a list state in RocksDB. Assume max retained checkpoints: 1.
On startup, the state is empty StateChangelog.sqn = T0 StateBackend.lastMaterializedSqn = T0 |
list.add(a) During normal processing, operator updates its state and simultaneously adds state changes to the StateChangelog. Logical time is not updated. |
CP1 triggered StateBackend calls StateChangelog.persisDurably(after = lastMaterializedSqn = T0). StateChangelog.sqn = T1 StateChangelog returns a Future representing the completion of write of (T0:T1 = a) and remembers it internally. |
list.add(b) |
State materialization is triggered StateBackend obtains the sqn from StateChangelog (T2) StateChangelog.sqn = T2 StateBackend flushes RocksDB memtables to SST files; starts the async phase passing it obtained sqn=T2 and snapshot (~list of files). Materialized state will be (T0:T2 = a,b) |
list.add(c) |
CP2 triggered StateBackend calls StateChangelog.persisDurably(after = lastMaterializedSqn = T0). StateChangelog.sqn = T3 StateChangelog finds the previous Future (T0:T1 = a) and combines it with a new one (T1:T3 = b,c). |
CP1-Future completes It contains a shared StateHandle pointing to a file on S3. This handle is included in TaskSnapshot and sent to JM. |
CP2-Future completes It contains a combined StateHandle pointing to two files on S3. StateHandle is included in TaskSnapshot and is sent to JM. |
JM finalizes CP1, and then CP2 CP1 is subsumed but nothing is removed yet. |
State materialization completes StateBackend stores the consolidated state handle (T0:T2 = a,b) StateBackend.lastMaterializedSqn = T2 StateChangelog.truncate(before = T2) // remove (T0:T1 = a) reference |
list.add(d) |
CP3 is triggered StateBackend calls StateChangelog.persisDurably(after = lastMaterializedSqn = T2). StateChangelog.sqn = T4 StateChangelog searches for reference matching the requested (T2:*) range. It finds completed (T1:T3 = b,c). To be able to filter out (T1=b) on recovery, the returned handle specifies the range (T2:T4). Files also specify the range for each changeset internally. Found Future is combined with a new one for (T3:T4 = d). |
CP3-Future completes StateBackend combines its result with the consolidated handle and sends to JM:
|
JM finalizes CP3 CP2 is subsumed, the state handle pointing to the file with (T0:T1 = a) is not referenced anymore and so removed. |
Compatibility, Deprecation, and Migration Plan
...
Besides DFS-based, some other options were considered. These are intentionally very rough estimates of different solutions:
Kafka-based unmanaged | Bookkeeper-based unmanaged | Bookkeeper managed | DFS-based | Custom persistence | |
1. Latency | Good | Good | Good | Bad (.1 - 1s) | Good |
2. Scalability | Good | Unknown | Unknown | Medium | Good |
3. Ease of deployment | Good (many users already have) | Bad | Good | Good | Good |
4. Stateless or no new stateful component | Yes (existing) | No | No | Yes (stateless) | No |
5. Development time | Good | Medium | Bad | Good | Very bad |
6. Other issues | Can truncate changes | High variability (workload, provider, …) |
With the highest priorities for the initial version being (3), (4), (5) DFS was chosen.
...