Status
Current state: Under Discussion
...
Table of Contents | ||
---|---|---|
|
Motivation
Low end-to-end latency is a much-demanded property in many Flink setups. With exactly-once, this latency depends on checkpoint interval/duration which in turn is defined by the slowest node (usually the one doing a full non-incremental snapshot). In large setups with many nodes, the probability of at least one node being slow gets higher, making almost every checkpoint slow.
This FLIP proposes a mechanism to deal with this by materializing and uploading state continuously and only uploading the changed part during the checkpoint itself. It differs from other approaches in that 1) checkpoints are always incremental; 2) works for any state backend.
Public Interfaces
- No changes to the public APIs (except for configuration)
- Configuration
- Toggle to enable
- StateChangelog: persist delay, retry policy, file prefix
- State backends: materialization interval, file prefix
- Monitoring
- StateChangelog: DFS request size, latency, number of “Logs” per request, errors
- State backends: DFS request size, latency, errors
Proposed Changes
General design
The idea to get faster checkpoints is similar to the database write-ahead log:
...
StateBackend maintains a logical timestamp to draw a boundary between the consolidated and unconsolidated changes.
Performing a checkpoint
On checkpoint, StateBackend requests StateChangelog to persist durably all the changes made after the latest (known) materialization (identified by a logical timestamp). The result is a regular (Keyed)StateHandle. StateChangelog guarantees that this handle only provides access to changes that are:
...
For savepoint, the call is just proxied to the corresponding method of the underlying StateBackend.
Recovery and rescaling
On JobMaster, state assignment doesn’t need to be changed because regular handles are used (that’s also true for rescaling).
On StateBackend, the previously materialized state is restored first (if any); and then the changelog is applied. It doesn’t have to be filtered for already materialized changes - that’s ensured during state handle creation. However, it has to be filtered out for irrelevant records on upscaling. For that, state changes must be complemented with the keygroup (the non-keyed state is not supported in the initial version).
Cleanup
Cleanup when subsuming old checkpoint
Existing SharedStateRegistry mechanisms suffice here: once a checkpoint is subsumed, reference counters for its state objects are decremented; and once they are zeroed, state objects are destroyed.
Preventing excessive metadata growth
There can be a case when the data rate is low and checkpointing is frequent. For example, 1 event per checkpoint and 10 checkpoints per second. That means there will be 10 new handles per second per backend instance. If consolidated, this can pile up quickly and blow JobMaster. There are several ways to deal with it:
...
The 2nd option is much simpler and with DFS-based StateChangelog should be enough.
Cleanup on shutdown
During a normal shutdown, StateBackend asks StateChangelog to delete any state changes which weren’t requested to be persisted durably. StateBackend also removes any uploaded materialized files that are not included in any checkpoint.
Cleanup after recovery
There is a chance that the above step didn’t complete, e.g. due to a crash. Orphaned files on DFS can include materialized and non-materialized changes (depending on StateChangelog implementation).
...
Note, that this is an existing issue, so we intend to implement the cleanup in the subsequent version. Existing (non-changelog) StateBackends will also benefit from this mechanism.
StateBackend
Tracking the changes
This can be done in the respective “State” implementations: MapState, ValueState, ListState, etc. For that, a wrapping layer can be added around KeyedStateBackend (and probably some supporting classes, such as RegisteredKeyValueStateBackendMetaInfo).
These state changes are sent to StateChangelog but not necessarily durably persisted yet.
Materializing state changes
As discussed in “Preventing excessive metadata growth”, materialization must be triggered periodically (and at most one at a time).
...
Note: for Incremental RocksDB, materialization does not necessarily trigger the compaction.
StateChangelog
At least for the first version, a DFS-based solution was chosen (see the Alternatives section for a discussion of other approaches). In this approach, StateChangelog is a stateless layer which bundles writes and translates them to write requests to DFS.
...
But because it is still quite complex and “local grouping” may probably suffice we decided to start with a non-distributed StateChangelog in the first version.
DFS write latency
From the experiments, the latency of writing 2.5Gb to S3 (500 streams of 5Mb objects) is:
...
If everything below second is considered acceptable on this scale then tail latencies become the major problem. Below are some techniques to reduce it.
Request hedging
The idea is to aggressively retry a small portion of requests that take much longer than others (see https://www2.cs.duke.edu/courses/cps296.4/fall13/838-CloudPapers/dean_longtail.pdf).
...
Assuming 500 writers, these request latencies translate to checkpoints latencies of p50=1000ms and p99=1700s. For .5Gb state increment, checkpoints latencies would be p75=500ms and p99=1125ms (assuming 250 writers).
Other techniques
Additionally, these techniques can be used:
...
- Unique prefixes (“folders”)
- S3 DNS load balancing for S3
API
Note: this is an internal API and may change in the future.
...
Code Block | ||
---|---|---|
| ||
/** Scoped to a single entity (e.g. a SubTask or OperatorCoordinator). */ interface StateChangelogClient { StateChangelogWriter createWriter(OperatorID operatorID, KeyGroupRange keyGroupRange); } /** Scoped to a single writer (e.g. state backend). */ interface StateChangelogWriter { void append(int keyGroup, byte[] value); CompletableFuture<StateChangelogHandle> persistDurably(SQN after); void truncate(SQN before); // called upon checkpoint confirmation by JM /** Close this log. No new appends will be possible. Any appended but not persisted records will be lost. */ void close(); SQN lastAppendedSqn(); } interface StateChangelogHandle extends KeyedStateHandle { /** Enumerate changes per key group. */ CloseableIterable<byte[]> getStateChanges(int keyGroup); } |
Implementation notes
- Because many backends may request to persist changes durably at different times (even for the same checkpoint), StateChangelog waits for some time after the first request and only then batches the changes and sends them to DFS. Besides that, it sends a request as soon as it gets persist requests from all the backends registered with it. Additionally, size threshold may be used.
- Upon receiving a request (from StateBackend) to persist durably starting from some SQN, changes before that SQN can NOT be dropped - JM can still decline the checkpoint
- Request hedging can be implemented in StateChangelog, Flink FS layer, custom FS layer per-provider, custom per-provider configuration. Implementing in StateChangelog gives a good trade-off between efficiency, portability, and effort. It also doesn’t affect other parts of the system.
File contents layout
This is an example layout inside a single file:
...
There will be likely a single changeset per keygroup but there is no such guarantee (each time StateBackend requests lastAppendedSqn a new changeset is started).
Example
Let's consider a job with one stateful operator having a list state in RocksDB. Assume max retained checkpoints: 1.
On startup, the state is empty StateChangelog.sqn = T0 StateBackend.lastMaterializedSqn = T0 |
list.add(a) During normal processing, operator updates its state and simultaneously adds state changes to the StateChangelog. Logical time is not updated. |
CP1 triggered StateBackend calls StateChangelog.persisDurably(after = lastMaterializedSqn = T0). StateChangelog.sqn = T1 StateChangelog returns a Future representing the completion of write of (T0:T1 = a) and remembers it internally. |
list.add(b) |
State materialization is triggered StateBackend obtains the sqn from StateChangelog (T2) StateChangelog.sqn = T2 StateBackend flushes RocksDB memtables to SST files; starts the async phase passing it obtained sqn=T2 and snapshot (~list of files). Materialized state will be (T0:T2 = a,b) |
list.add(c) |
CP2 triggered StateBackend calls StateChangelog.persisDurably(after = lastMaterializedSqn = T0). StateChangelog.sqn = T3 StateChangelog finds the previous Future (T0:T1 = a) and combines it with a new one (T1:T3 = b,c). |
CP1-Future completes It contains a shared StateHandle pointing to a file on S3. This handle is included in TaskSnapshot and sent to JM. |
CP2-Future completes It contains a combined StateHandle pointing to two files on S3. StateHandle is included in TaskSnapshot and is sent to JM. |
JM finalizes CP1, and then CP2 CP1 is subsumed but nothing is removed yet. |
State materialization completes StateBackend stores the consolidated state handle (T0:T2 = a,b) StateBackend.lastMaterializedSqn = T2 StateChangelog.truncate(before = T2) // remove (T0:T1 = a) reference |
list.add(d) |
CP3 is triggered StateBackend calls StateChangelog.persisDurably(after = lastMaterializedSqn = T2). StateChangelog.sqn = T4 StateChangelog searches for reference matching the requested (T2:*) range. It finds completed (T1:T3 = b,c). To be able to filter out (T1=b) on recovery, the returned handle specifies the range (T2:T4). Files also specify the range for each changeset internally. Found Future is combined with a new one for (T3:T4 = d). |
CP3-Future completes StateBackend combines its result with the consolidated handle and sends to JM:
|
JM finalizes CP3 CP2 is subsumed, the state handle pointing to the file with (T0:T1 = a) is not referenced anymore and so removed. |
Compatibility, Deprecation, and Migration Plan
- It should be possible to load the existing state as a consolidated state without any changes (but not vice-versa)
- Only keyed state in the initial version
- Probably, only RocksDB in the initial version
- Nothing is deprecated and no migration needed
- Changeset of a checkpoint must fit in memory
Test Plan
Existing ITCases with the feature enabled.
Rejected Alternatives
StateChangelog implementations
Besides DFS-based, some other options were considered. These are intentionally very rough estimates of different solutions:
...
With the highest priorities for the initial version being (3), (4), (5) DFS was chosen.
Distributed StateChangelog
Distributed StateChangelog was rejected in the first version as probably unnecessary.
...