You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 29 Next »

Status

Current state: Under Discussion

Discussion thread: https://mail-archives.apache.org/mod_mbox/flink-dev/201909.mbox/browser

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, triggering a checkpoint will cause a checkpoint barrier to flow from the sources through the DAG towards the sinks. This checkpoint barrier guarantees a consistent snapshot of the DAG at a given point in time.

For each operator with multiple input channels, an alignment phase is started whenever the checkpoint barrier is received on one channel, which causes further inputs of that channel to be blocked until the checkpoint barrier from the others channels are received.

This approach works fairly well for modest utilization but exhibits two main issues on a back-pressured pipeline:

  • Since the checkpoint barrier flows much slower through the back-pressured channels, the other channels and their upstream operators are effectively blocked during checkpointing.
  • The checkpoint barrier takes a long time to reach the sinks causing long checkpointing times. A longer checkpointing time in turn means that the checkpoint will be fairly outdated once done. Since a heavily utilized pipeline is inherently more fragile, we may run into a vicious cycle of late checkpoints, crash, recovery to a rather outdated checkpoint, more back pressure, and even later checkpoints, which would result in little to no progress in the application.


This FLIP proposes a way to perform checkpoints with a non-blocking alignment of checkpoint barriers. It provides the following benefits.

  • Upstream processes can continue to produce data, even if some operator still waits on a checkpoint barrier on a specific input channel.
  • Checkpointing times are heavily reduced across the execution graph, even for operators with a single input channel.
  • End-users will see more progress even in unstable environments as more up-to-date checkpoints will avoid too many recomputations.
  • Facilitate faster rescaling.


The key idea is to allow checkpoint barriers to be forwarded to downstream tasks before the synchronous part of the checkpointing has been conducted (see Fig. 1). To that end, we need to store in-flight data as part of the checkpoint as described in greater details in this FLIP.

Figure 1: Checkpoint barriers overtake in-flight records. Colored buffers need to be persisted together with the operator state.

Proposed Changes

In this FLIP, we

  • Change the handling of checkpoint barriers, 
  • Propose an API for persistently storing in-flight data,
  • Sketch a first implementation that co-locates in-flight data with operator state,
  • Enhance the checkpoint format, 
  • Recover from the new checkpoint while also allowing new checkpoints to be taken during recovering to guarantee progress, and
  • Refine scaling of operators to match the changed checkpoint format.

We will describe the steps in more detail as follows.

Handling of checkpoint barriers

Whenever we receive checkpoint barriers, we forward them immediately to the output. To achieve a consistent checkpoint, we need to persist all data in the input and output buffers between the received and forwarded checkpoint barrier. In contrast to the current checkpoints, we not only “freeze” the current operator state during the synchronous checkpointing phase, but also “freeze” the channels (see Fig. 1).

For operators with one input channel, the checkpoint barrier directly overtakes all input data, triggers the synchronous parts of the checkpointing algorithm in the operator and is forwarded to the end of the output buffer. 

If an operator has multiple input channels, there are two options:

  • We start checkpointing directly when we receive the first checkpoint barrier. At that time, we may receive an arbitrarily large number of records on the other channels before seeing the respective checkpoint barriers. We can receive arbitrarily large number of records only if an upstream operator multiplies the records number (like flatMap or join). In other cases the number of records is limited by the size of Flink’s network buffers.
  • We wait until we see the last checkpoint barrier and block the other input channels. In comparison to aligned checkpoints, we will block data flow for a shorter amount of time.


Here, the tradeoff is between storage size for in-flight data and checkpoint latency and time where an input channel is blocked. The current state can even be incorporated by waiting for checkpoint barrier to arrive at the operator and inserts the checkpoint barrier before the output.

We want to implement all 3 versions to allow a fine-grain evaluation. Depending on the outcome, we will choose a specific default option and exhibit a (hidden) configuration “checkpointing policy”:

  • ALIGNED (current behavior),
  • UNALIGNED_WITH_MAX_INFLIGHT_DATA (and secondary integer option “checkpointing max inflight data”),
  • UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA.

Note, that during downscaling the size limit of “max inflight data” might be temporarily exceeded during recovery (see recovery section).

Recover from the checkpoint with guaranteed progress.

To recover from a checkpoint, the persistent channel needs to be consumed first before processing any additional inputs; that is, input buffers are first used to unspill data and only then can be used to process upstream data. Similarly, the output buffers are taken until the spilled data has been unspilled. Normal operation can be resumed even without all data being unspilled as long as no new input can overtake unspilled data.

To guarantee progress even during recovery, we need to implement three features:

  • Checkpoint barriers need to be processed even if the unspilling is not completed and all input buffers are taken. 
  • Ideally, data that has not been completely unspilled will not be written twice. Instead, only the offsets of the checkpointed operators are updated.
  • Data unspilling does not require the full data to be copied to the task; that is, persistent data is consumed/unspilled lazily as a stream.

Thus, in an environment with frequent crashes (under backpressure), progress can be made as soon as the operators themselves are recovered fast enough.

The first, state-based storage will not satisfy all requirements. Nevertheless, assuming that the in-flight data is rather small, we should still quickly come to the point where new output is produced.

Rescaling of operators

When rescaling operators with in-flight data, we need to be careful to satisfy the ordering guarantees. 

If data is keyed, rescaling means that certain key groups are reassigned to new task instances. These task instances need to read the checkpoint of the former owner of the key group and filter a) relevant state (as is done already) and b) filter relevant data from the persisted data. 

Since rescaling is the rarest of the processes that touch the data (normal processing > checkpointing > recovery > rescaling), we opted for a non-optimized version of rescaling. When, after rescaling, two operator instances need to load data from the same state handle, they need to deserialize all records, apply the key selector, and receive the keygroup index to filter relevant records. We expect rather small amount of data to be processed multiple times and think that storing the keygroup index inside the data would impact performance for small records.

For non-keyed data, rescaling semantics is unfortunately a bit fuzzy. For this FLIP, we assume that no data of a given input split can overtake prior data in processing on forward channels. Any fan out or reshuffling will already destroy that ordering guarantee, so we can disregard these cases in this FLIP. If we focus on forward channels, however, we quickly run into situations where the ordering is violated (see Fig. 3).

Figure 3: Violating ordering guarantees while rescaling of non-keyed data. Split S1 is moved from source instance I to instance II.
If the original operators are back-pressured (⏮), data from the new source instance can overtake the original data.


To solve these issues, we have three solutions:

  • Unaligned checkpoints on non-keyed, forwarded operators are disabled by default. If we do not have in-flight data, we arrive at the status quo.
  • We add an explicit toggle to force unaligned checkpoints on non-keyed operators for power users. That toggle enables unaligned checkpoints when ordering is not important or no rescaling from a checkpoint will occur.
  • We devise and implement FLIP-? (light-weight partition reassignment), which will provide a very fine-grain mechanism to transfer the ownership of input splits/key groups. Based on this mechanism, we can fully support unaligned checkpoints on non-keyed operators.


Problem dimensions: keyed/non-keyed, scale-in/out, upstream/downstream

Possible issues: multi-buffer records, multi-record buffers, ordering (new/old data), pressure

General algorithm:

  1. find relevant state
    1. what are the requirements? can we use any distribution as long as we can satisfy (3) (find correct channels)?
  2. filter out records/buffers
    1. after scaling out, state file can contain irrelevant records
  3. load data into the correct channels (IncputChannel/SubPartition)
    1. this should resolve MBR issues
    2. solution: task IDs?
  4. ensure ordering
    1. solution: epochs?

Open questions

Avoiding double processing in downstream (if continuous spilling)

With continuous spilling, upstream saves all its output which includes what was already processed/buffered by the downstream.

Possible solutions:

  1. downstream loads upstream output buffers file and filters out what it has already processed; files are named deterministically and indexed
  2. same, but offset in the file is calculated based on buffer size
  3. downstream communicates its offset to upstream during snapshotting
  4. downstream communicates its offset to upstream during recovery

Compatibility, Deprecation, and Migration Plan

  • Make behavior optional. Disabled in first release.
  • Make behavior default in subsequent releases.
  • In documentation, clearly state that users of the checkpoint API can only use unaligned checkpoints if they do not require a consistent state across all operators.

Known Limitations

  • State size increase
    • Up to a couple of GB per task (especially painful on IO bound clusters)
    • Depends on the actual policy used (probably UNALIGNED_WITH_MAX_INFLIGHT_DATA  is the more plausible default)
  • Longer and heavier recovery depending on the increased state size
    • Can potentially trigger death spiral after a first failure
    • More up-to-date checkpoints will most likely still be an improvement about the current checkpoint behavior during backpressure
  • For the MVP
    • no re-scaling
    • max in-flight checkpoints = 1

Test Plan

  • Correctness tests with induced failures
  • Compare checkpoints times under backpressure with current state
  • Compare throughput under backpressure with current state
  • Compare progress under backpressure with frequently induced failures
  • Compare throughput with no checkpointing

Rejected Alternatives

There were several alternatives for specific design decisions:

Support of rescaling in non-keyed operators:

  • No guarantees for non-keyed data (Kafka way) was rejected because we knew some users (currently) depend on these guarantees.
  • Explicitly stating partitioning by users (reinterpret stream) would be a possible alternative for special cases where the current proposal fails.
  • No scaling for non-keyed data would make the work stealing of FLIP-27 hard to implement beyond the source.

For the first implementation of the persistence layer of in-flight data:

  • Writing onto DFS/S3 would introduce a new system dependency.
  • Implementing DFS on TaskManagers would require too much effort for now.
  • Using book keeper seems promising but would also add a new system dependency.







  • No labels