Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion thread

...

...

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14551

...

Release1.11


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

In the following experiments, we had a simple topology (source → map → map → sleepy map → measure map), where each channel was a random shuffle. The sleepy map slept on average 0, 0.01, and 0.1 ms per record to induce backpressure. We compared a POC for adhoc spilling, a POC for continuous spilling, and the base commit in the master (1.11-SNAPSHOT). For each approach and sleep time, we measured 3 times and took the median while persisting on HDFS in an EMR cluster with 1 master and 5 slaves, each having 32 cores and 256 GB RAM, with a total parallelism of 160.


Image ModifiedImage RemovedImage Added



We can see that for the baseline, the checkpoint duration somewhat correlates with the end-to-end latency as expectedstrongly increases with sleep time and thus backpressure. For both POCs, the checkpoint duration, however, remained stable and even decreases for higher backpressure because of lower overall data volume. Surprisingly, the checkpointing times for continuous spilling are higher than adhoc for lower sleep times. We suspect that we have additional backpressure for continuously write large amounts of data. With increasing backpressure and thus decreasing data volume, continuous spilling reaches sub-seconds checkpointing times.Image Removed


Nevertheless, continuous spilling seems to have rather big impact of overall throughput. While adhoc POC showed 10% performance decrease, continuous POC is clearly bottlenecked for higher volume. Primarily for that reason, we decided to go with the adhoc approach in this FLIP. While sub-seconds checkpointing times of continuous spilling surely would be a nice asset, the primary goal of decoupling checkpointing times from backpressure is also reached with adhoc spilling.

...

Compatibility, Deprecation, and Migration Plan

The goal of this FLIP is to provide a minimal viable product (MVP) that helps users with high and frequent backpressure. Unaligned checkpoints in MVP-stage will be disabled by default and can be enabled with a feature toggleinitially be an optional feature. After collecting experience and implementing all necessary extensions, unaligned checkpoint will probably be enabled by default for exactly once.

For compatibility, in documentation, clearly state that users of the checkpoint API can only use unaligned checkpoints if they do not require a consistent state across all operators.

In the following release, we aim to make unaligned checkpoints the default behavior and add the following enhancements:

...

...

Known Limitations

  • State size increase
    • Up to a couple of GB per task (especially painful on IO bound clusters)
    • Depends on the actual policy used (probably UNALIGNED_WITH_MAX_INFLIGHT_DATA  is the more plausible default)
  • Longer and heavier recovery depending on the increased state size
    • Can potentially trigger death spiral after a first failure
    • More up-to-date checkpoints will most likely still be an improvement about the current checkpoint behavior during backpressure
  • For the MVP
    • no re-scaling (need to use savepoint)
    • max in-flight checkpoints = 1

Test Plan

  • Correctness tests with induced failures
  • Compare checkpoints times under backpressure with current state
  • Compare throughput under backpressure with current state
  • Compare progress under backpressure with frequently induced failures
  • Compare throughput with no checkpointing

Roadmap

There are two primary goals. First, we want to provide a minimal viable product (MVP) that breaks up the vicious cycle of overload and instabilities of a cluster with slow checkpoints and more accumulated load resulting from recoveries from outdated checkpoints. Second, we aim for the full FLIP implementation that will among other things allow rescaling from unaligned checkpoints to directly counter overload.

In particular, the following improvements will be achieved through full release over MVP:

  • Re-scaling on unaligned checkpoints (need to use savepoint in MVP)
  • Incremental checkpointing to not write the same buffer multiple times (when the job is backpressured)
  • Advanced triggers for unaligned checkpoints such as timeouts on alignment or meeting the maximum threshold of checkpoint sizes
  • Support for concurrent checkpoints
  • Incremental loading and processing of state
  • No additional memory to load channel state: ideally, existing network buffers should be reused
  • Reduced number of files: single file could be reused for several checkpoints

We aim to make unaligned checkpoints the default behavior after the full implementation.