Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

From the experiments, the latency of writing 2.5Gb to S3 (500 streams of 5Mb objects) is:

p50

p90

p95

p99

p999

459ms

740ms

833ms

1039ms

3202ms


If everything below second is considered acceptable on this scale then tail latencies become the major problem. Below are some techniques to reduce it.

...

With a simple implementation, p999 latency decreases by 47% (with smaller requests the improvement is bigger):

p50

p90

p95

p99

p999

495ms

733ms

807ms

937ms

1689ms


Assuming 500 writers, these request latencies translate to checkpoints latencies of p50=1000ms and p99=1700s. For .5Gb state increment, checkpoints latencies would be p75=500ms and p99=1125ms (assuming 250 writers).

...

  1. Materializing state changes - Remember the current logical timestamp
  2. Performing a checkpoint (to provide SQN for persistDurably)
Code Block
languagejava
collapsetrue
/** Scoped to a single entity (e.g. a SubTask or OperatorCoordinator). */
interface StateChangelogClient {
    StateChangelogWriter createWriter(OperatorID operatorID, KeyGroupRange keyGroupRange);
}

/** Scoped to a single writer (e.g. state backend). */
interface StateChangelogWriter {

  void append(int keyGroup, byte[] value);

  CompletableFuture<StateChangelogHandle> persistDurably(SQN after);

  void truncate(SQN before); // called upon checkpoint confirmation by JM

  /** Close this log. No new appends will be possible. Any appended but not persisted records will be lost. */
  void close();

  SQN lastAppendedSqn();
}

interface StateChangelogHandle extends KeyedStateHandle {

  /** Enumerate changes per key group. */
  CloseableIterable<byte[]> getStateChanges(int keyGroup);
}

...

This is an example layout inside a single file:

Code Block
languagetextcollapsetrue
+----------------------------------------------------------------------------------------------+
| File: ABCD.bin (all subtasks of TM1 for checkpoint 2)                                        |
| +------------------------------------------------------+   +-------------------------------+ |
| | Log 0 ("group-by" operator)                          |   | Log 1 ("aggregate" operator)  | |
| | +--------------------------------------------------+ |   | +---------------------------+ | |
| | | +---------------------+  +---------------------+ | |   | | +---------------------+   | | |
| | | | KeyGroup 0          |  | KeyGroup 1          | | |   | | | KeyGroup 0          |   | | |
| | | | +-----------------+ |  | +-----------------+ | | |   | | | +-----------------+ |   | | |
| | | | | Changeset 1     | |  | | Changeset 1     | | | |   | | | | Changeset 1     | |   | | |
| | | | | window-contents | |  | | window-contents | | | |   | | | |                 | |   | | |
| | | | | timers          | |  | | timers          | | | |   | | | |                 | |   | | |
| | | | +-----------------+ |  | +-----------------+ | | |   | | | +-----------------+ |   | | |
| | | | +-----------------+ |  | +-----------------+ | | |   | | | +-----------------+ |   | | |
| | | | | Changeset 2     | |  | | Changeset 2     | | | |   | | | | Changeset 2     | |   | | |
| | | | | window-contents | |  | | window-contents | | | |   | | | |                 | |   | | |
| | | | | timers          | |  | | timers          | | | |   | | | |                 | |   | | |
| | | | +-----------------+ |  | +-----------------+ | | |   | | | +-----------------+ |   | | |
| | | | +-----------------+ |  | +-----------------+ | | |   | | | +-----------------+ |   | | |
| | | | | Changeset 3     | |  | | Changeset 3     | | | |   | | | | Changeset 3     | |   | | |
| | | | | window-contents | |  | | window-contents | | | |   | | | |                 | |   | | |
| | | | | timers          | |  | | timers          | | | |   | | | |                 | |   | | |
| | | | +-----------------+ |  | +-----------------+ | | |   | | | +-----------------+ |   | | |
| | | +---------------------+  +---------------------+ | |   | | +---------------------+   | | |
| | +--------------------------------------------------+ |   | +---------------------------+ | |
| +------------------------------------------------------+   +-------------------------------+ |
+----------------------------------------------------------------------------------------------+

...

Let's consider a job with one stateful operator having a list state in RocksDB. Assume max retained checkpoints: 1.

On startup, the state is empty

StateChangelog.sqn = T0

StateBackend.lastMaterializedSqn = T0

list.add(a)

During normal processing, operator updates its state and simultaneously adds state changes to the StateChangelog. Logical time is not updated.

CP1 triggered

StateBackend calls StateChangelog.persisDurably(after = lastMaterializedSqn = T0).

StateChangelog.sqn = T1

StateChangelog returns a Future representing the completion of write of (T0:T1 = a) and remembers it internally.

list.add(b)

State materialization is triggered

StateBackend obtains the sqn from StateChangelog (T2)

StateChangelog.sqn = T2

StateBackend flushes RocksDB memtables to SST files; starts the async phase passing it obtained sqn=T2 and snapshot (~list of files).

Materialized state will be (T0:T2 = a,b)

list.add(c)

CP2 triggered

StateBackend calls StateChangelog.persisDurably(after = lastMaterializedSqn = T0).

StateChangelog.sqn = T3

StateChangelog finds the previous Future (T0:T1 = a) and combines it with a new one (T1:T3 = b,c).

CP1-Future completes

It contains a shared StateHandle pointing to a file on S3.

This handle is included in TaskSnapshot and sent to JM.

CP2-Future completes

It contains a combined StateHandle pointing to two files on S3. 

StateHandle is included in TaskSnapshot and is sent to JM.

JM finalizes CP1, and then CP2

CP1 is subsumed but nothing is removed yet.

State materialization completes

StateBackend stores the consolidated state handle (T0:T2 = a,b)

StateBackend.lastMaterializedSqn = T2

StateChangelog.truncate(before = T2) // remove (T0:T1 = a) reference

list.add(d)

CP3 is triggered

StateBackend calls StateChangelog.persisDurably(after = lastMaterializedSqn = T2).

StateChangelog.sqn = T4

StateChangelog searches for reference matching the requested (T2:*) range. It finds completed (T1:T3 = b,c). To be able to filter out (T1=b) on recovery, the returned handle specifies the range (T2:T4). Files also specify the range for each changeset internally.

Found Future is combined with a new one for (T3:T4 = d).

CP3-Future completes

StateBackend combines its result with the consolidated handle and sends to JM:

  • (T0:T1 = a,b) - consolidated changes
  • (T1:T3 = b,c) - previously saved changelog (“b” filtered on recovery)
  • (T3:T4 = d) - the newly saved changelog

JM finalizes CP3

CP2 is subsumed, the state handle pointing to the file with (T0:T1 = a) is not referenced anymore and so removed.

Compatibility, Deprecation, and Migration Plan

...

Besides DFS-based, some other options were considered. These are intentionally very rough estimates of different solutions:


Kafka-based unmanaged

Bookkeeper-based unmanaged

Bookkeeper managed

DFS-based

Custom persistence

1. Latency

Good

Good

Good

Bad (.1 - 1s)

Good

2. Scalability

Good

Unknown

Unknown

Medium

Good

3. Ease of deployment

Good (many users already have)

Bad

Good

Good

Good

4. Stateless or no new stateful component

Yes (existing)

No

No

Yes (stateless)

No

5. Development time

Good

Medium

Bad

Good

Very bad

6. Other issues

Can truncate changes



High variability (workload, provider, …)



With the highest priorities for the initial version being (3), (4), (5) DFS was chosen.

...