Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Figure 1: Checkpoint barriers overtake in-flight records. Colored buffers need to be persisted together with the operator state.

Preliminary work

We first evaluated with a POC when to persist. We had two options:

  • Persist in an adhoc fashion on checkpoint. While keeping the data volume to a minimum this approach could potentially overload external systems during checkpointing.
  • Persist continuously to avoid such overload on the cost of storing much more data and potentially decreasing throughput.

In the following experiments, we had a simple topology (source → map → map → sleepy map → measure map), where each channel was a random shuffle. The sleepy map slept on average 0, 0.01, and 0.1 ms per record to induce backpressure. We compared a POC for adhoc spilling, a POC for continuous spilling, and the base commit in the master (1.11-SNAPSHOT). For each approach and sleep time, we measured 3 times and took the median while persisting on HDFS in an EMR cluster with 1 master and 5 slaves, each having 32 cores and 256 GB RAM, with a total parallelism of 160.


Image AddedImage AddedImage Added


We can see that for the baseline the checkpoint duration somewhat correlates with the end-to-end latency as expected. For both POCs, the checkpoint duration, however, remained stable and even decreases for higher backpressure. Surprisingly, the checkpointing times for continuous spilling are higher than adhoc for lower sleep times. We suspect that we have additional backpressure for continuously write large amount of data. With increasing backpressure and thus decreasing data volume, continuous spilling reaches sub-seconds checkpointing times.

Nevertheless, continuous spilling seems to have rather big impact of overall throughput. While adhoc POC showed 10% performance decrease, continuous POC is clearly bottlenecked for higher volume. Primarily for that reason, we decided to go with the adhoc approach in this FLIP. While sub-seconds checkpointing times of continuous spilling surely would be a nice asset, the primary goal of decoupling checkpointing times from backpressure is also reached with adhoc spilling.


Proposed Changes

In this FLIP, we

  • Change the handling of checkpoint barriers, to allow them to overtake other records,
  • Enhance the checkpoint format to persist the inflight data
  • Recover from the new checkpoint while also allowing new checkpoints to be taken during recovering to guarantee progress, and
  • Propose a solution of how the operators can be rescaled in the future

...

Note, that during downscaling the size limit of “max inflight data” might be temporarily exceeded during recovery (see recovery section).

Persistence

For persistence, existing mechanisms will be reused. The benefits are:

Including channel state into snapshot:

  1. Don’t duplicate re-distribution code and avoid inconsistencies between channel and operator state distribution (especially keyed)
  2. Simplicity: having state not split into channel and operator parts
  3. Being able to use existing snapshotting mechanism (below)

Reading/writing using the existing mechanisms (versus custom storage):

  1. Don’t duplicate code for provider-agnostic storage, configuration, local recovery
  2. Ease deployment by having single storage for operator and channel state
  3. Avoid inconsistencies between operator and channel state (e.g. different retention times)
  4. Possibility to reuse incremental checkpoints; other alternatives avoid doubly-storing buffers differently, but probably at the cost of more complex cleanup

Disadvantages

  1. Less flexibility
  2. Risk of break snapshotting
  3. Increased checkpoints size

Components

  1. Checkpoint Metadata
    1. Channel StateHandle extends StreamStateHandle, contains subtask index and information to place it to the correct subpartition/inputchannel
    2. TaskStateSnapshot, PendingCheckpoint, and CompletedCheckpoint - contains a collection of ChannelStateHandle
  2. Writer. Writes network buffers using a provided CheckpointStreamFactory and returns state handles
  3. Reader. Reads data from state handle and returns buffers (loads and deserializes)
  4. (de)Serializer
  5. StateAssignmentOperation (existing class) - reads snapshot metadata and distributes state handles across subtasks; need to add channel states distribution

TaskStateManagerImpl (existing class) - holds state assigned to task

Key decisions

  1. snapshot channel state when a checkpoint arrives (as opposed to continuously writing all output buffers): POC showed it's more efficient and has lower latency
  2. use the existing checkpointing mechanism (see Persistence)
  3. restore upstream/downstream parts of a channel in the corresponding tasks (as opposed to restoring the whole channel in the downstream task): less coupling and easier to implement if use the existing checkpointing mechanism

...

The first, state-based storage will not satisfy all requirements. Nevertheless, assuming that the in-flight data is rather small, we should still quickly come to the point where new output is produced.

Persistence

For persistence, existing mechanisms will be reused. The benefits are:

Including channel state into snapshot:

  1. Don’t duplicate re-distribution code and avoid inconsistencies between channel and operator state distribution (especially keyed)
  2. Simplicity: having state not split into channel and operator parts
  3. Being able to use existing snapshotting mechanism (below)

Reading/writing using the existing mechanisms (versus custom storage):

...

.

...

Disadvantages

  1. Less flexibility
  2. Risk of break snapshotting
  3. Increased checkpoints size

Components

  1. Checkpoint Metadata
    1. Channel StateHandle extends StreamStateHandle, contains subtask index and information to place it to the correct subpartition/inputchannel
    2. TaskStateSnapshot, PendingCheckpoint, and CompletedCheckpoint - contains a collection of ChannelStateHandle
  2. Writer. Writes network buffers using a provided CheckpointStreamFactory and returns state handles
  3. Reader. Reads data from state handle and returns buffers (loads and deserializes)
  4. (de)Serializer
  5. StateAssignmentOperation (existing class) - reads snapshot metadata and distributes state handles across subtasks; need to add channel states distribution

TaskStateManagerImpl (existing class) - holds state assigned to task


Efficiency

The following optimizations won't necessarily be implemented in MVP:

...