...
- All changes from a single StateBackend for a single checkpoint
- All backends in a single subtask (OperatorChain)
- All subtasks in a single TM (the same job)Multiple
- TMs (local StateBackend proxies requests to StateBackend on some remote TM - see the belowIn the future we may also decide to batch requests from multiple TMs (see Future directions)
To satisfy the requirements discussed in the previous sections:
- Each StateBackend instance is associated with a unique LogID upon startup to isolate instances from one another (see Performing a checkpoint)
- StateChangelog maintains a mapping between logical timestamps and state changes to be able to include only changes after the materialization (see Performing a checkpoint)
- keygroup is passed and written along with the changes (see Rescaling)
- StateHandle key used in SharedStateRegistry should only be based on the file name and not on backend UUID or keygroup (multiple handles can refer to the same file and making keys unique will effectively disable sharing)
The chosen direction allows to implement a distributed StateChangelog which may look like this:
However, a number of questions arise, such as which node to send data to? Who and how decides to rescale the “active” set of nodes? How is this decision communicated? And so on
One way to address them is a hierarchical topology (see Rejected alternatives section).
But because it is still quite complex and “local grouping” may probably suffice we decided to start with a non-distributed StateChangelog in the first version.
DFS write latency
DFS write latency
From the experiments, the latency of writing 2.5Gb to S3 (500 streams of 5Mb objects) is:
p50 | p90 | p95 | p99 | p999 |
459ms | 740ms | 833ms | 1039ms | 3202ms |
If everything below second is considered acceptable on this scale then tail latencies become the major problem. Below are some techniques to reduce it.
Request hedging
The idea is to aggressively retry a small portion of requests that take much longer than others (see https://www2.cs.duke.edu/courses/cps296.4/fall13/838-CloudPapers/dean_longtail.pdf).
With a simple implementation, p999 latency decreases by 47% (with smaller requests the improvement is bigger)From the experiments, the latency of writing 2.5Gb to S3 (500 streams of 5Mb objects) is:
p50 | p90 | p95 | p99 | p999 |
459ms495ms | 740ms733ms | 833ms807ms | 1039ms937ms | 3202ms |
If everything below second is considered acceptable on this scale then tail latencies become the major problem. Below are some techniques to reduce it.
Request hedging
The idea is to aggressively retry a small portion of requests that take much longer than others (see https://www2.cs.duke.edu/courses/cps296.4/fall13/838-CloudPapers/dean_longtail.pdf).
With a simple implementation, p999 latency decreases by 47% (with smaller requests the improvement is bigger):
p50 | p90 | p95 | p99 | p999 |
495ms | 733ms | 807ms | 937ms | 1689ms |
Assuming 500 writers, these request latencies translate to checkpoints latencies of p50=1000ms and p99=1700s. For .5Gb state increment, checkpoints latencies would be p75=500ms and p99=1125ms (assuming 250 writers).
Other techniques
Additionally, these techniques can be used:
- Adjusting aggregation: change target request size (or total request count); this is a tradeoff between median and tail latencies. Implies distributed implementation; automatic adjustment can be unstable.
- Multipart upload (for S3, files bigger than 5Mb). Can be useful only with aggressive aggregation; otherwise, requests are likely to be smaller than 5Mb.
- Use multiple buckets (in S3, throttling is applied on bucket level until re-balance). Probably makes sense only for very large setups
These are already implemented or can be configured:
- Unique prefixes (“folders”)
- S3 DNS load balancing for S3
API
Note: this is an internal API and may change in the future.
To enable StateChangelog to flush intermediate data, StateBackend should be able to append changes as they happen and only request to persist them durably on the checkpoint. Therefore, StateChangelog must be aware of the backend logical timestamp (SQN in code).
Furthermore, to avoid too fine granular SQN-to-change mappings and more efficient batching, SQN should be generated by StateChangelog and exposed to the backend via the getLastAppendedSqn() method. So there are two usages of it:
- Materializing state changes - Remember the current logical timestamp
- Performing a checkpoint (to provide SQN for persistDurably)
Code Block | ||
---|---|---|
| ||
/** Scoped to a single entity (e.g. a SubTask or OperatorCoordinator). */
interface StateChangelogClient {
StateChangelogWriter createWriter(OperatorID operatorID, KeyGroupRange keyGroupRange);
}
/** Scoped to a single writer (e.g. state backend). */
interface StateChangelogWriter {
void append(int keyGroup, byte[] value);
CompletableFuture<StateChangelogHandle> persistDurably(SQN after);
void truncate(SQN before); // called upon checkpoint confirmation by JM
/** Close this log. No new appends will be possible. Any appended but not persisted records will be lost. */
void close();
SQN lastAppendedSqn();
}
interface StateChangelogHandle extends KeyedStateHandle {
/** Enumerate changes per key group. */
CloseableIterable<byte[]> getStateChanges(int keyGroup);
} |
Implementation notes
- Because many backends may request to persist changes durably at different times (even for the same checkpoint), StateChangelog waits for some time after the first request and only then batches the changes and sends them to DFS. Besides that, it sends a request as soon as it gets persist requests from all the backends registered with it. Additionally, size threshold may be used.
- Upon receiving a request (from StateBackend) to persist durably starting from some SQN, changes before that SQN can NOT be dropped - JM can still decline the checkpoint
- Request hedging can be implemented in StateChangelog, Flink FS layer, custom FS layer per-provider, custom per-provider configuration. Implementing in StateChangelog gives a good trade-off between efficiency, portability, and effort. It also doesn’t affect other parts of the system.
File contents layout
This is an example layout inside a single file:
1689ms |
Assuming 500 writers, these request latencies translate to checkpoints latencies of p50=1000ms and p99=1700s. For .5Gb state increment, checkpoints latencies would be p75=500ms and p99=1125ms (assuming 250 writers).
Other techniques
Additionally, these techniques can be used:
- Adjusting aggregation: change target request size (or total request count); this is a tradeoff between median and tail latencies. Implies distributed implementation; automatic adjustment can be unstable.
- Multipart upload (for S3, files bigger than 5Mb). Can be useful only with aggressive aggregation; otherwise, requests are likely to be smaller than 5Mb.
- Use multiple buckets (in S3, throttling is applied on bucket level until re-balance). Probably makes sense only for very large setups
These are already implemented or can be configured:
- Unique prefixes (“folders”)
- S3 DNS load balancing for S3
API
Note: this is an internal API and may change in the future.
To enable StateChangelog to flush intermediate data, StateBackend should be able to append changes as they happen and only request to persist them durably on the checkpoint. Therefore, StateChangelog must be aware of the backend logical timestamp (SQN in code).
Furthermore, to avoid too fine granular SQN-to-change mappings and more efficient batching, SQN should be generated by StateChangelog and exposed to the backend via the getLastAppendedSqn() method. So there are two usages of it:
- Materializing state changes - Remember the current logical timestamp
- Performing a checkpoint (to provide SQN for persistDurably)
Code Block | ||
---|---|---|
| ||
/** Scoped to a single entity (e.g. a SubTask or OperatorCoordinator). */
interface StateChangelogClient {
StateChangelogWriter createWriter(OperatorID operatorID, KeyGroupRange keyGroupRange);
}
/** Scoped to a single writer (e.g. state backend). */
interface StateChangelogWriter {
void append(int keyGroup, byte[] value);
CompletableFuture<StateChangelogHandle> persistDurably(SQN after);
void truncate(SQN before); // called upon checkpoint confirmation by JM
/** Close this log. No new appends will be possible. Any appended but not persisted records will be lost. */
void close();
SQN lastAppendedSqn();
}
interface StateChangelogHandle extends KeyedStateHandle {
/** Enumerate changes per key group. */
CloseableIterable<byte[]> getStateChanges(int keyGroup);
} |
Implementation notes
- Because many backends may request to persist changes durably at different times (even for the same checkpoint), StateChangelog waits for some time after the first request and only then batches the changes and sends them to DFS. Besides that, it sends a request as soon as it gets persist requests from all the backends registered with it. Additionally, size threshold may be used.
- Upon receiving a request (from StateBackend) to persist durably starting from some SQN, changes before that SQN can NOT be dropped - JM can still decline the checkpoint
- Request hedging can be implemented in StateChangelog, Flink FS layer, custom FS layer per-provider, custom per-provider configuration. Implementing in StateChangelog gives a good trade-off between efficiency, portability, and effort. It also doesn’t affect other parts of the system.
File contents layout
This is an example layout inside a single file:
Code Block | ||
---|---|---|
| ||
+------------ | ||
Code Block | ||
| ||
+----------------------------------------------------------------------------------------------+ | File: ABCD.bin (all subtasks of TM1 for checkpoint 2) | | +------------------------------------------------------+ +-------------------------------+ | | | Log 0 ("group-by" operator) | | Log 1 ("aggregate" operator) | | | | +--------------------------------------------------+ | | +---------------------------+ | | | | | +---------------------+ +---------------------+ | | | | +---------------------+ | | | | | | | KeyGroup 0 | | KeyGroup 1 | | | | | | KeyGroup 0 | | | | | | | | +-----------------+ | | +-----------------+ | | | | | | +-----------------+ | | | | | | | | | Changeset 1 | | | | Changeset 1 | | | | | | | | Changeset 1 | | | | | | | | | | window-contents | | | | window-contents | | | | | | | | | | | | | | | | | | timers | | | | timers | | | | | | | | | | | | | | | | | +-----------------+ | | +-----------------+ | | | | | | +-----------------+ | | | | | | | | +-----------------+ | | +-----------------+ | | | | | | +-----------------+ | | | | | | | | | Changeset 2 | | | | Changeset 2 | | | | | | | | Changeset 2 | | | | | | | | | | window-contents | | | | window-contents | | | | | | | | | | | | | | | | | | timers | | | | timers | | | | | | | | | | | | | | | | | +-----------------+ | | +-----------------+ | | | | | | +-----------------+ | | | | | | | | +-----------------+ | | +-----------------+ | | | | | | +-----------------+ | | | | | | | | | Changeset 3 | | | | Changeset 3 | | | | | | | | Changeset 3 | | | | | | | | | | window-contents | | | | window-contents | | | | | | | | | | | | | | | | | | timers | | | | timers | | | | | | | | | | | | | | | | | +-----------------+ | | +-----------------+ | | | | | | +-----------------+ | | | | | | | +---------------------+ +---------------------+ | | | | +---------------------+ | | | | | +--------------------------------------------------+ | | +---------------------------+ | | | +------------------------------------------------------+ +-------------------------------+ | +----------------------------------------------------------------------------------------------+ |
...
Existing ITCases with the feature enabled.
Rejected Alternatives
StateChangelog implementations
Besides DFS-based, some other options were considered. These are intentionally very rough estimates of different solutions:
...
Kafka-based unmanaged
...
Bookkeeper-based unmanaged
...
Bookkeeper managed
...
DFS-based
...
Custom persistence
...
1. Latency
...
Good
...
Good
...
Good
...
Bad (.1 - 1s)
...
Good
...
2. Scalability
...
Good
...
Unknown
...
Unknown
...
Medium
...
Good
...
3. Ease of deployment
...
Good (many users already have)
...
Bad
...
Good
...
Good
...
Good
...
4. Stateless or no new stateful component
...
Yes (existing)
...
No
...
No
...
Yes (stateless)
...
No
...
5. Development time
...
Good
...
Medium
...
Bad
...
Good
...
Very bad
Future directions
The chosen approach allows to implement a distributed StateChangelog which may look like this:
A number of questions arise, such as which node to send data to? Who and how decides to rescale the “active” set of nodes? How is this decision communicated?
One possible approach is to organize distributed StateChangelog nodes into a hierarchy/DAG: each node proxies requests up to the root unless it has accumulated enough data to send straight away to DFS:
It has the following advantages:
- No need for load-balancing or rescaling
- No centralized coordination
- Making a decision is relatively easy (no additional metrics needed)
- Decisions are dynamic
- The topology is static (which helps in case of a potential single-task failover)
- Each node needs only a single connection to send the data
The disadvantages are:
- Additional round-trips - can be reduced by choosing a higher branching factor
Unnecessary traffic through the intermediate nodes - can be avoided by making a decision upfront for the next N checkpoints on each node and communicating downstream. In the figure above, TM0 address would be propagated to TM2 and TM7; TM1 will send its own address to its descendants.
Rejected Alternatives
StateChangelog implementations
Besides DFS-based, some other options were considered. These are intentionally very rough estimates of different solutions:
Kafka-based unmanaged | Bookkeeper-based unmanaged | Bookkeeper managed | DFS-based | Custom persistence | |
1. Latency | Good | Good | Good | Bad (.1 - 1s) | Good |
2. Scalability | Good | Unknown | Unknown | Medium | Good |
3. Ease of deployment | Good (many users already have) | Bad | Good | Good | Good |
4. Stateless or no new stateful component | Yes (existing) | No | No | Yes (stateless) | No |
5. Development time | Good | Medium | Bad | Good | Very bad |
6. Other issues | Can truncate changes | High variability (workload, provider, …) |
With the highest priorities for the initial version being (3), (4), (5) DFS was chosen.
...
6. Other issues
...
Can truncate changes
...
High variability (workload, provider, …)
With the highest priorities for the initial version being (3), (4), (5) DFS was chosen.
Distributed StateChangelog
Distributed StateChangelog was rejected in the first version as probably unnecessary.
One possible approach is to organize distributed StateChangelog nodes into a hierarchy/DAG: each node proxies requests up to the root unless it has accumulated enough data to send straight away to DFS:
It has the following advantages:
- No need for load-balancing or rescaling
- No centralized coordination
- Making a decision is relatively easy (no additional metrics needed)
- Decisions are dynamic
- The topology is static (which helps in case of a potential single-task failover)
- Each node needs only a single connection to send the data
The disadvantages are:
- Additional round-trips - can be reduced by choosing a higher branching factor
Unnecessary traffic through the intermediate nodes - can be avoided by making a decision upfront for the next N checkpoints on each node and communicating downstream. In the figure above, TM0 address would be propagated to TM2 and TM7; TM1 will send its own address to its descendants.