Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current stateUnder Discussion

...

Table of Contents
maxLevel3

Motivation

Low end-to-end latency is a much-demanded property in many Flink setups. With exactly-once, this latency depends on checkpoint interval/duration which in turn is defined by the slowest node (usually the one doing a full non-incremental snapshot). In large setups with many nodes, the probability of at least one node being slow gets higher, making almost every checkpoint slow.

This FLIP proposes a mechanism to deal with this by materializing and uploading state continuously and only uploading the changed part during the checkpoint itself. It differs from other approaches in that 1) checkpoints are always incremental; 2) works for any state backend.

Public Interfaces

  • No changes to the public APIs (except for configuration)
  • Configuration
    • Toggle to enable
    • StateChangelog: persist delay, retry policy, file prefix
    • State backends: materialization interval, file prefix
  • Monitoring
    • StateChangelog: DFS request size, latency, number of “Logs” per request, errors
    • State backends: DFS request size, latency, errors

Proposed Changes

General design

The idea to get faster checkpoints is similar to the database write-ahead log:

...

StateBackend maintains a logical timestamp to draw a boundary between the consolidated and unconsolidated changes.

Performing a checkpoint

On checkpoint, StateBackend requests StateChangelog to persist durably all the changes made after the latest (known) materialization (identified by a logical timestamp). The result is a regular (Keyed)StateHandle. StateChangelog guarantees that this handle only provides access to changes that are:

...

For savepoint, the call is just proxied to the corresponding method of the underlying StateBackend.

Recovery and rescaling

On JobMaster, state assignment doesn’t need to be changed because regular handles are used (that’s also true for rescaling).

On StateBackend, the previously materialized state is restored first (if any); and then the changelog is applied. It doesn’t have to be filtered for already materialized changes - that’s ensured during state handle creation. However, it has to be filtered out for irrelevant records on upscaling. For that, state changes must be complemented with the keygroup (the non-keyed state is not supported in the initial version).

Cleanup

Cleanup when subsuming old checkpoint

Existing SharedStateRegistry mechanisms suffice here: once a checkpoint is subsumed, reference counters for its state objects are decremented; and once they are zeroed, state objects are destroyed.

Preventing excessive metadata growth

There can be a case when the data rate is low and checkpointing is frequent. For example, 1 event per checkpoint and 10 checkpoints per second. That means there will be 10 new handles per second per backend instance. If consolidated, this can pile up quickly and blow JobMaster. There are several ways to deal with it:

...

The 2nd option is much simpler and with DFS-based StateChangelog should be enough.

Cleanup on shutdown

During a normal shutdown, StateBackend asks StateChangelog to delete any state changes which weren’t requested to be persisted durably. StateBackend also removes any uploaded materialized files that are not included in any checkpoint.

Cleanup after recovery

There is a chance that the above step didn’t complete, e.g. due to a crash. Orphaned files on DFS can include materialized and non-materialized changes (depending on StateChangelog implementation).

...

Note, that this is an existing issue, so we intend to implement the cleanup in the subsequent version. Existing (non-changelog) StateBackends will also benefit from this mechanism.

StateBackend

Tracking the changes

This can be done in the respective “State” implementations: MapState, ValueState, ListState, etc. For that, a wrapping layer can be added around KeyedStateBackend (and probably some supporting classes, such as RegisteredKeyValueStateBackendMetaInfo).

These state changes are sent to StateChangelog but not necessarily durably persisted yet.

Materializing state changes

As discussed in “Preventing excessive metadata growth”, materialization must be triggered periodically (and at most one at a time).

...

Note: for Incremental RocksDB, materialization does not necessarily trigger the compaction.

StateChangelog

At least for the first version, a DFS-based solution was chosen (see the Alternatives section for a discussion of other approaches). In this approach, StateChangelog is a stateless layer which bundles writes and translates them to write requests to DFS. 

...

But because it is still quite complex and “local grouping” may probably suffice we decided to start with a non-distributed StateChangelog in the first version.

DFS write latency

From the experiments, the latency of writing 2.5Gb to S3 (500 streams of 5Mb objects) is:

...

If everything below second is considered acceptable on this scale then tail latencies become the major problem. Below are some techniques to reduce it.

Request hedging

The idea is to aggressively retry a small portion of requests that take much longer than others (see https://www2.cs.duke.edu/courses/cps296.4/fall13/838-CloudPapers/dean_longtail.pdf).

...

Assuming 500 writers, these request latencies translate to checkpoints latencies of p50=1000ms and p99=1700s. For .5Gb state increment, checkpoints latencies would be p75=500ms and p99=1125ms (assuming 250 writers).

Other techniques

Additionally, these techniques can be used:

...

  1. Unique prefixes (“folders”)
  2. S3 DNS load balancing for S3

API

Note: this is an internal API and may change in the future.

...

Code Block
languagejava
/** Scoped to a single entity (e.g. a SubTask or OperatorCoordinator). */
interface StateChangelogClient {
    StateChangelogWriter createWriter(OperatorID operatorID, KeyGroupRange keyGroupRange);
}

/** Scoped to a single writer (e.g. state backend). */
interface StateChangelogWriter {

  void append(int keyGroup, byte[] value);

  CompletableFuture<StateChangelogHandle> persistDurably(SQN after);

  void truncate(SQN before); // called upon checkpoint confirmation by JM

  /** Close this log. No new appends will be possible. Any appended but not persisted records will be lost. */
  void close();

  SQN lastAppendedSqn();
}

interface StateChangelogHandle extends KeyedStateHandle {

  /** Enumerate changes per key group. */
  CloseableIterable<byte[]> getStateChanges(int keyGroup);
}

Implementation notes

  • Because many backends may request to persist changes durably at different times (even for the same checkpoint), StateChangelog waits for some time after the first request and only then batches the changes and sends them to DFS. Besides that, it sends a request as soon as it gets persist requests from all the backends registered with it. Additionally, size threshold may be used.
  • Upon receiving a request (from StateBackend) to persist durably starting from some SQN, changes before that SQN can NOT be dropped - JM can still decline the checkpoint
  • Request hedging can be implemented in StateChangelog, Flink FS layer, custom FS layer per-provider, custom per-provider configuration. Implementing in StateChangelog gives a good trade-off between efficiency, portability, and effort. It also doesn’t affect other parts of the system.

File contents layout

This is an example layout inside a single file:

...

There will be likely a single changeset per keygroup but there is no such guarantee (each time StateBackend requests lastAppendedSqn a new changeset is started).

Example

Let's consider a job with one stateful operator having a list state in RocksDB. Assume max retained checkpoints: 1.

On startup, the state is empty

StateChangelog.sqn = T0

StateBackend.lastMaterializedSqn = T0

list.add(a)

During normal processing, operator updates its state and simultaneously adds state changes to the StateChangelog. Logical time is not updated.

CP1 triggered

StateBackend calls StateChangelog.persisDurably(after = lastMaterializedSqn = T0).

StateChangelog.sqn = T1

StateChangelog returns a Future representing the completion of write of (T0:T1 = a) and remembers it internally.

list.add(b)

State materialization is triggered

StateBackend obtains the sqn from StateChangelog (T2)

StateChangelog.sqn = T2

StateBackend flushes RocksDB memtables to SST files; starts the async phase passing it obtained sqn=T2 and snapshot (~list of files).

Materialized state will be (T0:T2 = a,b)

list.add(c)

CP2 triggered

StateBackend calls StateChangelog.persisDurably(after = lastMaterializedSqn = T0).

StateChangelog.sqn = T3

StateChangelog finds the previous Future (T0:T1 = a) and combines it with a new one (T1:T3 = b,c).

CP1-Future completes

It contains a shared StateHandle pointing to a file on S3.

This handle is included in TaskSnapshot and sent to JM.

CP2-Future completes

It contains a combined StateHandle pointing to two files on S3. 

StateHandle is included in TaskSnapshot and is sent to JM.

JM finalizes CP1, and then CP2

CP1 is subsumed but nothing is removed yet.

State materialization completes

StateBackend stores the consolidated state handle (T0:T2 = a,b)

StateBackend.lastMaterializedSqn = T2

StateChangelog.truncate(before = T2) // remove (T0:T1 = a) reference

list.add(d)

CP3 is triggered

StateBackend calls StateChangelog.persisDurably(after = lastMaterializedSqn = T2).

StateChangelog.sqn = T4

StateChangelog searches for reference matching the requested (T2:*) range. It finds completed (T1:T3 = b,c). To be able to filter out (T1=b) on recovery, the returned handle specifies the range (T2:T4). Files also specify the range for each changeset internally.

Found Future is combined with a new one for (T3:T4 = d).

CP3-Future completes

StateBackend combines its result with the consolidated handle and sends to JM:

  • (T0:T1 = a,b) - consolidated changes
  • (T1:T3 = b,c) - previously saved changelog (“b” filtered on recovery)
  • (T3:T4 = d) - the newly saved changelog

JM finalizes CP3

CP2 is subsumed, the state handle pointing to the file with (T0:T1 = a) is not referenced anymore and so removed.

Compatibility, Deprecation, and Migration Plan

  • It should be possible to load the existing state as a consolidated state without any changes (but not vice-versa)
  • Only keyed state in the initial version
  • Probably, only RocksDB in the initial version
  • Nothing is deprecated and no migration needed
  • Changeset of a checkpoint must fit in memory

Test Plan

Existing ITCases with the feature enabled.

Rejected Alternatives

StateChangelog implementations

Besides DFS-based, some other options were considered. These are intentionally very rough estimates of different solutions:

...

With the highest priorities for the initial version being (3), (4), (5) DFS was chosen.

Distributed StateChangelog

Distributed StateChangelog was rejected in the first version as probably unnecessary.

...