Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Put distributed version under Future directions

...

  1. All changes from a single StateBackend for a single checkpoint
  2. All backends in a single subtask (OperatorChain)
  3. All subtasks in a single TM (the same job)Multiple
  4. TMs (local StateBackend proxies requests to StateBackend on some remote TM - see the belowIn the future we may also decide to batch requests from multiple TMs (see Future directions)

To satisfy the requirements discussed in the previous sections:

  1. Each StateBackend instance is associated with a unique LogID upon startup to isolate instances from one another (see Performing a checkpoint)
  2. StateChangelog maintains a mapping between logical timestamps and state changes to be able to include only changes after the materialization (see Performing a checkpoint) 
  3. keygroup is passed and written along with the changes (see Rescaling)
  4. StateHandle key used in SharedStateRegistry should only be based on the file name and not on backend UUID or keygroup (multiple handles can refer to the same file and making keys unique will effectively disable sharing)

The chosen direction allows to implement a distributed StateChangelog which may look like this:

Image Removed

However, a number of questions arise, such as which node to send data to? Who and how decides to rescale the “active” set of nodes? How is this decision communicated? And so on

One way to address them is a hierarchical topology (see Rejected alternatives section). 

But because it is still quite complex and “local grouping” may probably suffice we decided to start with a non-distributed StateChangelog in the first version.

DFS write latency

DFS write latency

From the experiments, the latency of writing 2.5Gb to S3 (500 streams of 5Mb objects) is:

p50

p90

p95

p99

p999

459ms

740ms

833ms

1039ms

3202ms


If everything below second is considered acceptable on this scale then tail latencies become the major problem. Below are some techniques to reduce it.

Request hedging

The idea is to aggressively retry a small portion of requests that take much longer than others (see https://www2.cs.duke.edu/courses/cps296.4/fall13/838-CloudPapers/dean_longtail.pdf).

With a simple implementation, p999 latency decreases by 47% (with smaller requests the improvement is bigger)From the experiments, the latency of writing 2.5Gb to S3 (500 streams of 5Mb objects) is:

p50

p90

p95

p99

p999

459ms495ms

740ms733ms

833ms807ms

1039ms937ms

3202ms

If everything below second is considered acceptable on this scale then tail latencies become the major problem. Below are some techniques to reduce it.

Request hedging

The idea is to aggressively retry a small portion of requests that take much longer than others (see https://www2.cs.duke.edu/courses/cps296.4/fall13/838-CloudPapers/dean_longtail.pdf).

With a simple implementation, p999 latency decreases by 47% (with smaller requests the improvement is bigger):

p50

p90

p95

p99

p999

495ms

733ms

807ms

937ms

1689ms

Assuming 500 writers, these request latencies translate to checkpoints latencies of p50=1000ms and p99=1700s. For .5Gb state increment, checkpoints latencies would be p75=500ms and p99=1125ms (assuming 250 writers).

Other techniques

Additionally, these techniques can be used:

  1. Adjusting aggregation: change target request size (or total request count); this is a tradeoff between median and tail latencies. Implies distributed implementation; automatic adjustment can be unstable.
  2. Multipart upload (for S3, files bigger than 5Mb). Can be useful only with aggressive aggregation; otherwise, requests are likely to be smaller than 5Mb.
  3. Use multiple buckets (in S3, throttling is applied on bucket level until re-balance). Probably makes sense only for very large setups

These are already implemented or can be configured:

  1. Unique prefixes (“folders”)
  2. S3 DNS load balancing for S3

API

Note: this is an internal API and may change in the future.

To enable StateChangelog to flush intermediate data, StateBackend should be able to append changes as they happen and only request to persist them durably on the checkpoint. Therefore, StateChangelog must be aware of the backend logical timestamp (SQN in code).

Furthermore, to avoid too fine granular SQN-to-change mappings and more efficient batching, SQN should be generated by StateChangelog and exposed to the backend via the getLastAppendedSqn() method. So there are two usages of it:

  1. Materializing state changes - Remember the current logical timestamp
  2. Performing a checkpoint (to provide SQN for persistDurably)
Code Block
languagejava
/** Scoped to a single entity (e.g. a SubTask or OperatorCoordinator). */
interface StateChangelogClient {
    StateChangelogWriter createWriter(OperatorID operatorID, KeyGroupRange keyGroupRange);
}

/** Scoped to a single writer (e.g. state backend). */
interface StateChangelogWriter {

  void append(int keyGroup, byte[] value);

  CompletableFuture<StateChangelogHandle> persistDurably(SQN after);

  void truncate(SQN before); // called upon checkpoint confirmation by JM

  /** Close this log. No new appends will be possible. Any appended but not persisted records will be lost. */
  void close();

  SQN lastAppendedSqn();
}

interface StateChangelogHandle extends KeyedStateHandle {

  /** Enumerate changes per key group. */
  CloseableIterable<byte[]> getStateChanges(int keyGroup);
}

Implementation notes

  • Because many backends may request to persist changes durably at different times (even for the same checkpoint), StateChangelog waits for some time after the first request and only then batches the changes and sends them to DFS. Besides that, it sends a request as soon as it gets persist requests from all the backends registered with it. Additionally, size threshold may be used.
  • Upon receiving a request (from StateBackend) to persist durably starting from some SQN, changes before that SQN can NOT be dropped - JM can still decline the checkpoint
  • Request hedging can be implemented in StateChangelog, Flink FS layer, custom FS layer per-provider, custom per-provider configuration. Implementing in StateChangelog gives a good trade-off between efficiency, portability, and effort. It also doesn’t affect other parts of the system.

File contents layout

This is an example layout inside a single file:

1689ms


Assuming 500 writers, these request latencies translate to checkpoints latencies of p50=1000ms and p99=1700s. For .5Gb state increment, checkpoints latencies would be p75=500ms and p99=1125ms (assuming 250 writers).

Other techniques

Additionally, these techniques can be used:

  1. Adjusting aggregation: change target request size (or total request count); this is a tradeoff between median and tail latencies. Implies distributed implementation; automatic adjustment can be unstable.
  2. Multipart upload (for S3, files bigger than 5Mb). Can be useful only with aggressive aggregation; otherwise, requests are likely to be smaller than 5Mb.
  3. Use multiple buckets (in S3, throttling is applied on bucket level until re-balance). Probably makes sense only for very large setups

These are already implemented or can be configured:

  1. Unique prefixes (“folders”)
  2. S3 DNS load balancing for S3

API

Note: this is an internal API and may change in the future.

To enable StateChangelog to flush intermediate data, StateBackend should be able to append changes as they happen and only request to persist them durably on the checkpoint. Therefore, StateChangelog must be aware of the backend logical timestamp (SQN in code).

Furthermore, to avoid too fine granular SQN-to-change mappings and more efficient batching, SQN should be generated by StateChangelog and exposed to the backend via the getLastAppendedSqn() method. So there are two usages of it:

  1. Materializing state changes - Remember the current logical timestamp
  2. Performing a checkpoint (to provide SQN for persistDurably)
Code Block
languagejava
/** Scoped to a single entity (e.g. a SubTask or OperatorCoordinator). */
interface StateChangelogClient {
    StateChangelogWriter createWriter(OperatorID operatorID, KeyGroupRange keyGroupRange);
}

/** Scoped to a single writer (e.g. state backend). */
interface StateChangelogWriter {

  void append(int keyGroup, byte[] value);

  CompletableFuture<StateChangelogHandle> persistDurably(SQN after);

  void truncate(SQN before); // called upon checkpoint confirmation by JM

  /** Close this log. No new appends will be possible. Any appended but not persisted records will be lost. */
  void close();

  SQN lastAppendedSqn();
}

interface StateChangelogHandle extends KeyedStateHandle {

  /** Enumerate changes per key group. */
  CloseableIterable<byte[]> getStateChanges(int keyGroup);
}

Implementation notes

  • Because many backends may request to persist changes durably at different times (even for the same checkpoint), StateChangelog waits for some time after the first request and only then batches the changes and sends them to DFS. Besides that, it sends a request as soon as it gets persist requests from all the backends registered with it. Additionally, size threshold may be used.
  • Upon receiving a request (from StateBackend) to persist durably starting from some SQN, changes before that SQN can NOT be dropped - JM can still decline the checkpoint
  • Request hedging can be implemented in StateChangelog, Flink FS layer, custom FS layer per-provider, custom per-provider configuration. Implementing in StateChangelog gives a good trade-off between efficiency, portability, and effort. It also doesn’t affect other parts of the system.

File contents layout

This is an example layout inside a single file:

Code Block
languagetext
+------------
Code Block
languagetext
+----------------------------------------------------------------------------------------------+
| File: ABCD.bin (all subtasks of TM1 for checkpoint 2)                                        |
| +------------------------------------------------------+   +-------------------------------+ |
| | Log 0 ("group-by" operator)                          |   | Log 1 ("aggregate" operator)  | |
| | +--------------------------------------------------+ |   | +---------------------------+ | |
| | | +---------------------+  +---------------------+ | |   | | +---------------------+   | | |
| | | | KeyGroup 0          |  | KeyGroup 1          | | |   | | | KeyGroup 0          |   | | |
| | | | +-----------------+ |  | +-----------------+ | | |   | | | +-----------------+ |   | | |
| | | | | Changeset 1     | |  | | Changeset 1     | | | |   | | | | Changeset 1     | |   | | |
| | | | | window-contents | |  | | window-contents | | | |   | | | |                 | |   | | |
| | | | | timers          | |  | | timers          | | | |   | | | |                 | |   | | |
| | | | +-----------------+ |  | +-----------------+ | | |   | | | +-----------------+ |   | | |
| | | | +-----------------+ |  | +-----------------+ | | |   | | | +-----------------+ |   | | |
| | | | | Changeset 2     | |  | | Changeset 2     | | | |   | | | | Changeset 2     | |   | | |
| | | | | window-contents | |  | | window-contents | | | |   | | | |                 | |   | | |
| | | | | timers          | |  | | timers          | | | |   | | | |                 | |   | | |
| | | | +-----------------+ |  | +-----------------+ | | |   | | | +-----------------+ |   | | |
| | | | +-----------------+ |  | +-----------------+ | | |   | | | +-----------------+ |   | | |
| | | | | Changeset 3     | |  | | Changeset 3     | | | |   | | | | Changeset 3     | |   | | |
| | | | | window-contents | |  | | window-contents | | | |   | | | |                 | |   | | |
| | | | | timers          | |  | | timers          | | | |   | | | |                 | |   | | |
| | | | +-----------------+ |  | +-----------------+ | | |   | | | +-----------------+ |   | | |
| | | +---------------------+  +---------------------+ | |   | | +---------------------+   | | |
| | +--------------------------------------------------+ |   | +---------------------------+ | |
| +------------------------------------------------------+   +-------------------------------+ |
+----------------------------------------------------------------------------------------------+

...

Existing ITCases with the feature enabled.

Rejected Alternatives

StateChangelog implementations

Besides DFS-based, some other options were considered. These are intentionally very rough estimates of different solutions:

...

Kafka-based unmanaged

...

Bookkeeper-based unmanaged

...

Bookkeeper managed

...

DFS-based

...

Custom persistence

...

1. Latency

...

Good

...

Good

...

Good

...

Bad (.1 - 1s)

...

Good

...

2. Scalability

...

Good

...

Unknown

...

Unknown

...

Medium

...

Good

...

3. Ease of deployment

...

Good (many users already have)

...

Bad

...

Good

...

Good

...

Good

...

4. Stateless or no new stateful component

...

Yes (existing)

...

No

...

No

...

Yes (stateless)

...

No

...

5. Development time

...

Good

...

Medium

...

Bad

...

Good

...

Very bad

Future directions

The chosen approach allows to implement a distributed StateChangelog which may look like this:

Image Added

A number of questions arise, such as which node to send data to? Who and how decides to rescale the “active” set of nodes? How is this decision communicated?

One possible approach is to organize distributed StateChangelog nodes into a hierarchy/DAG: each node proxies requests up to the root unless it has accumulated enough data to send straight away to DFS:

Image Added

It has the following advantages:

  • No need for load-balancing or rescaling
  • No centralized coordination
  • Making a decision is relatively easy (no additional metrics needed)
  • Decisions are dynamic
  • The topology is static (which helps in case of a potential single-task failover)
  • Each node needs only a single connection to send the data

The disadvantages are:

  • Additional round-trips - can be reduced by choosing a higher branching factor

Unnecessary traffic through the intermediate nodes - can be avoided by making a decision upfront for the next N checkpoints on each node and communicating downstream. In the figure above, TM0 address would be propagated to TM2 and TM7; TM1 will send its own address to its descendants. 

Rejected Alternatives

StateChangelog implementations

Besides DFS-based, some other options were considered. These are intentionally very rough estimates of different solutions:


Kafka-based unmanaged

Bookkeeper-based unmanaged

Bookkeeper managed

DFS-based

Custom persistence

1. Latency

Good

Good

Good

Bad (.1 - 1s)

Good

2. Scalability

Good

Unknown

Unknown

Medium

Good

3. Ease of deployment

Good (many users already have)

Bad

Good

Good

Good

4. Stateless or no new stateful component

Yes (existing)

No

No

Yes (stateless)

No

5. Development time

Good

Medium

Bad

Good

Very bad

6. Other issues

Can truncate changes



High variability (workload, provider, …)


With the highest priorities for the initial version being (3), (4), (5) DFS was chosen.

...

6. Other issues

...

Can truncate changes

...

High variability (workload, provider, …)

With the highest priorities for the initial version being (3), (4), (5) DFS was chosen.

Distributed StateChangelog

Distributed StateChangelog was rejected in the first version as probably unnecessary.

One possible approach is to organize distributed StateChangelog nodes into a hierarchy/DAG: each node proxies requests up to the root unless it has accumulated enough data to send straight away to DFS:

Image Removed

It has the following advantages:

  • No need for load-balancing or rescaling
  • No centralized coordination
  • Making a decision is relatively easy (no additional metrics needed)
  • Decisions are dynamic
  • The topology is static (which helps in case of a potential single-task failover)
  • Each node needs only a single connection to send the data

The disadvantages are:

  • Additional round-trips - can be reduced by choosing a higher branching factor

Unnecessary traffic through the intermediate nodes - can be avoided by making a decision upfront for the next N checkpoints on each node and communicating downstream. In the figure above, TM0 address would be propagated to TM2 and TM7; TM1 will send its own address to its descendants.