You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, triggering a checkpoint will cause a checkpoint barrier to flow from the sources through the DAG towards the sinks. This checkpoint barrier guarantees a consistent snapshot of the DAG at a given point in time.

For each operator with multiple input channels, an alignment phase is started whenever the checkpoint barrier is received on one channel, which causes further inputs of that channel to be blocked until the checkpoint barrier from the others channels are received.

This approach works fairly well for modest utilization but exhibits two main issues on a back-pressured pipeline:

  • Since the checkpoint barrier flows much slower through the back-pressured channels, the other channels and their upstream operators are effectively blocked during checkpointing.
  • The checkpoint barrier takes a long time to reach the sinks causing long checkpointing times. A longer checkpointing time in turn means that the checkpoint will be fairly outdated once done. Since a heavily utilized pipeline is inherently more fragile, we may run into a vicious cycle of late checkpoints, crash, recovery to a rather outdated checkpoint, more back pressure, and even later checkpoints, which would result in little to no progress in the application.


This FLIP proposes a way to perform checkpoints with a non-blocking alignment of checkpoint barriers. It provides the following benefits.

  • Upstream processes can continue to produce data, even if some operator still waits on a checkpoint barrier on a specific input channel.
  • Checkpointing times are heavily reduced across the execution graph, even for operators with a single input channel.
  • End-users will see more progress even in unstable environments as more up-to-date checkpoints will avoid too many recomputations.
  • Facilitate faster rescaling.


The key idea is to allow checkpoint barriers to be forwarded to downstream tasks before the synchronous part of the checkpointing has been conducted (see Fig. 1). To that end, we need to store in-flight data as part of the checkpoint as described in greater details in this FLIP.



Figure 1: Checkpoint barriers overtake in-flight records. Colored buffers need to be persisted together with the operator state.

Public Interfaces

  • To store in-flight data, we need to adjust the checkpoint format.

Proposed Changes

In this FLIP, we

  • Change the handling of checkpoint barriers, 
  • Propose an API for persistently storing in-flight data,
  • Sketch a first implementation that co-locates in-flight data with operator state,
  • Enhance the checkpoint format, 
  • Recover from the new checkpoint while also allowing new checkpoints to be taken during recovering to guarantee progress, and
  • Refine scaling of operators to match the changed checkpoint format.

We will describe the steps in more detail as follows.


Handling of checkpoint barriers

Whenever we receive checkpoint barriers, we forward them immediately to the output. To achieve a consistent checkpoint, we need to persist all data in the input and output buffers between the received and forwarded checkpoint barrier. In contrast to the current checkpoints, we not only “freeze” the current operator state during the synchronous checkpointing phase, but also “freeze” the channels (see Fig. 1).

For operators with one input channel, the checkpoint barrier directly overtakes all input data, triggers the synchronous parts of the checkpointing algorithm in the operator and is forwarded to the end of the output buffer. 

If an operator has multiple input channels, there are two options:

  • We start checkpointing directly when we receive the first checkpoint barrier. At that time, we may receive an arbitrarily large number of records on the other channels before seeing the respective checkpoint barriers. We can receive arbitrarily large number of records only if an upstream operator multiplies the records number (like flatMap or join). In other cases the number of records is limited by the size of Flink’s network buffers.
  • We wait until we see the last checkpoint barrier and block the other input channels. In comparison to aligned checkpoints, we will block data flow for a shorter amount of time.


Here, the tradeoff is between storage size for in-flight data and checkpoint latency and time where an input channel is blocked. The current state can even be incorporated by waiting for checkpoint barrier to arrive at the operator and inserts the checkpoint barrier before the output.

We want to implement all 3 versions to allow a fine-grain evaluation. Depending on the outcome, we will choose a specific default option and exhibit a (hidden) configuration “checkpointing policy”:

  • ALIGNED (current behavior),
  • UNALIGNED_WITH_MAX_INFLIGHT_DATA (and secondary integer option “checkpointing max inflight data”),
  • UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA.


Note, that during downscaling the size limit of “max inflight data” might be temporarily exceeded during recovery (see recovery section).

Persistently store in-flight data


To store in-flight data, this FLIP defines an API and a straight-forward implementation. We envision more sophisticated implementations in the future.

To achieve high performance, data is persisted directly by storing the containing network buffers. Records within the buffer remain serialized, so that the buffers can be directly passed to the storage service.

The API to store the buffers consists of three parts: one writing component, one reading component, and a factory component. Reading and writing will happen on the level of logical input and output, so that channels and record writers are merged in one file/state. During recover, the channels are separated again.

The writer simple has one method to append a buffer to the persisted data. It is implicitly bound to an input gate or the output of the task, such that it known to which logical input or output and to which task the data belongs.

Persist
interface PersistentInFlightDataStorer extends Closeable {
	/**
	 * Appends a given buffer to the persisting storage. 
 * <p>This method may be blocking until the data is completely persisted.
 * <p>If this method is non-blocking, implementers must make a defensive copy of the buffer.
	 */ 
	void append(Buffer buffer) throws IOException;
 
	/**
	 * Finalizes the storage for this particular input or output.
	 * <p>For non-blocking append, this method must ensure that all data has been 
	 * successfully persisted and indicate any error.
	 */
	@Override
	void close() throws IOException;
}


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives


If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.





  • No labels