Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion

...

threadhttps://

...

...

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

...

thread/5g1jgk51b3q1jn57vqp39176dkfy0sjr
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14551

Release1.11


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

draw.io Diagram
bordertrue
viewerToolbartrue
fitWindowfalse
diagramDisplayNameCheckpoint barriers overtake in-flight records
lboxtrue
revision2
diagramNameUnaligned motivation.drawio
simpleViewerfalse
width
linksauto
tbstyletop
diagramWidth509revision2

Figure 1: Checkpoint barriers overtake in-flight records. Colored buffers need to be persisted together with the operator state.

Public Interfaces

...

.

Preliminary work

We first evaluated with POCs two options when to persist:

  • Persist in an adhoc fashion on checkpoint. While keeping the data volume to a minimum this approach could potentially overload external systems during checkpointing.
  • Persist continuously to avoid such overload on the cost of storing much more data and potentially decreasing throughput.

In the following experiments, we had a simple topology (source → map → map → sleepy map → measure map), where each channel was a random shuffle. The sleepy map slept on average 0, 0.01, and 0.1 ms per record to induce backpressure. We compared a POC for adhoc spilling, a POC for continuous spilling, and the base commit in the master (1.11-SNAPSHOT). For each approach and sleep time, we measured 3 times and took the median while persisting on HDFS in an EMR cluster with 1 master and 5 slaves, each having 32 cores and 256 GB RAM, with a total parallelism of 160.


Image AddedImage Added



We can see that for the baseline, the checkpoint duration strongly increases with sleep time and thus backpressure. For both POCs, the checkpoint duration, however, remained stable and even decreases for higher backpressure because of lower overall data volume. Surprisingly, the checkpointing times for continuous spilling are higher than adhoc for lower sleep times. We suspect that we have additional backpressure for continuously write large amounts of data. With increasing backpressure and thus decreasing data volume, continuous spilling reaches sub-seconds checkpointing times.


Nevertheless, continuous spilling seems to have rather big impact of overall throughput. While adhoc POC showed 10% performance decrease, continuous POC is clearly bottlenecked for higher volume. Primarily for that reason, we decided to go with the adhoc approach in this FLIP. While sub-seconds checkpointing times of continuous spilling surely would be a nice asset, the primary goal of decoupling checkpointing times from backpressure is also reached with adhoc spilling.

Proposed Changes

In this FLIP, we

  • Change the handling of checkpoint barriers,  
  • Propose an API for persistently storing in-flight data,
  • Sketch a first implementation that co-locates in-flight data with operator state,
  • to allow them to overtake other records,
  • Add persist the inflight data inside of checkpointsEnhance the checkpoint format
  • Recover from the new checkpoint while also allowing new checkpoints to be taken during recovering to guarantee progress, and
  • Refine scaling of operators to match the changed checkpoint format.

We will describe the steps in more detail as follows.

  • Propose a solution of how the operators can be rescaled in the future

Handling of checkpoint barriers

...

Note, that during downscaling the size limit of “max inflight data” might be temporarily exceeded during recovery (see recovery section).

Persistently store in-flight data

To store in-flight data, this FLIP defines an API and a straight-forward implementation. We envision more sophisticated implementations in the future.

To achieve high performance, data is persisted directly by storing the containing network buffers. Records within the buffer remain serialized, so that the buffers can be directly passed to the storage service.

The API to store the buffers consists of three parts: one writing component, one reading component, and a factory component. Reading and writing will happen on the level of logical input and output, so that channels and record writers are merged in one file/state. During recover, the channels are separated again.

The writer simple has one method to append a buffer to the persisted data. It is implicitly bound to an input gate or the output of the task, such that it known to which logical input or output and to which task the data belongs.

Code Block
languagejava
titlePersist
interface PersistentInFlightDataStorer extends Closeable {
	/**
	 * Appends a given buffer to the persisting storage. 
	 * <p>This method may be blocking until the data is completely persisted.
	 * <p>If this method is non-blocking, implementers must make a defensive copy of the buffer.
	 */ 
	void append(Buffer buffer) throws IOException;
 
	/**
	 * Finalizes the storage for this particular input or output.
	 * <p>For non-blocking append, this method must ensure that all data has been 
	 * successfully persisted and indicate any error.
	 */
	@Override
	void close() throws IOException;
}

The corresponding reader follows the usual pattern as in AsyncDataInput.

Code Block
languagejava
titlePersist
interface PersistentInFlightDataRetriever extends AvailabilityProvider {
	/**
	 * Returns the next buffer if available. This method should be non-blocking.
	 */
	Optional<Buffer> pollNext();
}

Lastly, the corresponding factory as well as all created instances are owned by a specific task and is initialized during checkpointing.

Code Block
languagejava
titlePersist
interface PersistentInFlightDataStorage {
	PersistentInFlightDataStorer createStorer(int gateId);
	PersistentInFlightDataRetriever createRetriever(int gateId);
}

State-based storage

A first, straight-forward solution will store all in-flight data as part of the checkpoint state, separately for each logical input/output. Buffers are directly appended onto the respective state handle with a small header that identifies the channel/record writer, so that records spanning multiple buffers can be safely restored. 

Furthermore, the state containing the persisted data of an input/output should also contain a small header. The header consists of a version number identifying the used checkpoint format. For the first version (described above), the number is ‘1’ and gives us the option for backward compatible changes. See Figure 4 for an overview over all involved headers. Note that the current buffer format remains untouched and is just shown for completeness.

Figure 2: The format of the state-based storage. The state header contains the version number and is written once per logical input / output. Each buffer is prefixed with the channel id for restoring the original assignment to channels.

Enhancing the checkpoints of operators

Depending on the implementation of the persistent storage, operators need to store the in-flight data or pointers to it into the operator state. In an operator chain, the head operator will store the input data and the tail operator store the output data. 

The actual logic for persisting data is encapsulated in the PersistentInFlightDataStorer and PersistentInFlightDataRetriever that should receive a state handle during initialization. Future versions could store the data in external systems and simply add pointers to the external systems in the state. In the following, we assume some kind of streaming storage with offsets.

For an input channel, the relevant offset is right after the checkpoint barrier. For multiple inputs, multiple offsets need to be saved. If we decide to store keygroups individually for faster rescaling, the same offset will be stored for each key group. 

Analogously, the relevant offset to be stored for output buffers is the one before the forwarded checkpoint barrier. Similarly to input channels, we may store output offset per record writer or even key group in the future.

Recover from the checkpoint with guaranteed progress.

...

Persistence

For persistence, existing state management primitives will be reused to colocate operator state and inflight data, which offers the following advantages.

  • Don’t duplicate code and avoid inconsistencies between channel and operator state distribution (especially keyed)
  • Simplicity on recovery and rescaling: having state not split into channel and operator parts
  • Avoid inconsistencies between operator and channel state (e.g. different retention times)
  • Being able to use existing snapshotting mechanism with the possibility to reuse incremental checkpoints

However, some drawbacks may require to use alternatives ways in the future.

  • Less flexibility
  • Risk of break snapshotting
  • Increased checkpoints size

The checkpoint format is only implicitly extended by adding more (keyed) state with conventional naming.

Components

In general, inflight data is stored in state handles per operator sub task that are ultimately managed by CheckpointCoordinator. We need to add or modify the following components.

  1. Checkpoint Metadata
    1. Channel StateHandle extends StreamStateHandle, contains subtask index and information to place it to the correct subpartition/inputchannel
    2. TaskStateSnapshot, PendingCheckpoint, and CompletedCheckpoint - contains a collection of ChannelStateHandle
  2. Writer. Writes network buffers using a provided CheckpointStreamFactory and returns state handles
  3. Reader. Reads data from state handle and returns buffers (loads and deserializes)
  4. Buffer (de)serializer
  5. StateAssignmentOperation (existing class) - reads snapshot metadata and distributes state handles across subtasks; need to add channel states distribution

TaskStateManagerImpl (existing class) - holds state assigned to task

Recovery

From a high level perspective, inflight data is restored where it was previously spilled:

  • Outgoing buffers are restored on upstream side into output.
  • Incoming buffers are restored on downstream side into the input.

This approach facilitates an easy integration into existing checkpointing mechanism. Restored buffers take precedence over all newly produced/received buffers. The following steps explain the assignment of state and mapping it to in-memory data structures (without rescaling) in more detail:

  • Channel state consists of upstream and downstream parts; they are recovered by upstream/downstream subtasks separately
  • Each part (InputChannel/SubPartition) is represented as StateHandle and included in the snapshot
  • Each part in metadata has subtask id and target or source subtask id
  • On recovery, checkpoint coordinator distributes state according to subtask index
    1. Without rescaling, we don’t need to do anything to colocate operator and channel state (state placement isn't changed)
    2. Raw state is distributed in the same manner
  • Subtasks place state into proper InputChannel/SubPartitions according to target/source id

Rescaling

Since we hook into existing checkpointing mechanism, most of the rescaling challenges are solved in the same way.

Non-keyed

To place a state into a task CheckpointCoordinator uses modulo operation. For example, when scaling out:

  1. Upstream: new_par_level % old_src_task_index
  2. Downstream: new_par_level % old_dst_task_index

Subtask uses the same logic to determine the right InputChannel/SubPartition.

To match multi-buffer records, both upstream and downstream must sort records in the same order

  • i.e., if we have same some channel state on upstream, and channel state on downstream corresponds to it (as ensured by step 4); then it’s enough to sort them by channel id ([src_id : dst_id]); this will ensure the order of records
  • there is a (technical?) problem with the proposed solution to match multi-buffer records: we need to alternate load channel and process network operations which can be tricky
  • another solution would be to have temporary "virtual" channels to process data from "imported" channels

Keyed

For each key group, we need to place channel and operator state on the same subtask. For output buffers, we know (at least conceptually) the keys, so we can store state as KeyedStateHandle and then reuse operator state redistribution code.For input buffers, we only know a set of groups - the groups assigned to this task.

But, given that placement of key groups among the tasks is deterministic, we can: 1) compute old and new placements and 2) give each new task input buffers that could contain it’s key groups. The task then must filter out irrelevant records by keys (after deserialization). For regular (single-buffer) records it’s trivial.

For multi-buffer record, if it doesn’t belong to this subtask, it will never receive remaining buffers and therefore can’t deserialize and filter it out. Possible solutions:

  1. Send restored records from upstream to all downstreams that could need it (not according to key); we can use the same logic as in distribution for that
  2. Use sequence numbers: if we expect next part of a multi-buffer record on this channel, but receive a buffer with a different SN; then discard this stored buffer
  3. Write key in the beginning of each buffer (need to ensure the key itself isn’t splitted)

Guaranteed progress

To guarantee progress even during recovery, we need to implement three features:

...

Thus, in an environment with frequent crashes (under backpressure), progress can be made as soon as the operators themselves are recovered fast enough.

The first, state-based storage will not satisfy all requirements. Nevertheless, assuming that the in-flight data is rather small, we should still quickly come to the point where new output is produced.

Rescaling of operators

When rescaling operators with in-flight data, we need to be careful to satisfy the ordering guarantees. 

If data is keyed, rescaling means that certain key groups are reassigned to new task instances. These task instances need to read the checkpoint of the former owner of the key group and filter a) relevant state (as is done already) and b) filter relevant data from the persisted data. 

Since rescaling is the rarest of the processes that touch the data (normal processing > checkpointing > recovery > rescaling), we opted for a non-optimized version of rescaling. When, after rescaling, two operator instances need to load data from the same state handle, they need to deserialize all records, apply the key selector, and receive the keygroup index to filter relevant records. We expect rather small amount of data to be processed multiple times and think that storing the keygroup index inside the data would impact performance for small records.

For non-keyed data, rescaling semantics is unfortunately a bit fuzzy. For this FLIP, we assume that no data of a given input split can overtake prior data in processing on forward channels. Any fan out or reshuffling will already destroy that ordering guarantee, so we can disregard these cases in this FLIP. If we focus on forward channels, however, we quickly run into situations where the ordering is violated (see Fig. 3).

Figure 3: Split 1 is moved from source instance I to instance II. If the original operators are back-pressured, data from the new source instance can overtake the original data.

To solve these issues, we have three solutions:

  • Unaligned checkpoints on non-keyed, forwarded operators are disabled by default. If we do not have in-flight data, we arrive at the status quo.
  • We add an explicit toggle to force unaligned checkpoints on non-keyed operators for power users. That toggle enables unaligned checkpoints when ordering is not important or no rescaling from a checkpoint will occur.
  • We devise and implement FLIP-? (light-weight partition reassignment), which will provide a very fine-grain mechanism to transfer the ownership of input splits/key groups. Based on this mechanism, we can fully support unaligned checkpoints on non-keyed operators.

Compatibility, Deprecation, and Migration Plan

...

Compatibility, Deprecation, and Migration Plan

Unaligned checkpoints will initially be an optional feature. After collecting experience and implementing all necessary extensions, unaligned checkpoint will probably be enabled by default for exactly once.

For compatibility, in

...

documentation, clearly state that users of the checkpoint API can only use unaligned checkpoints if they do not require a consistent state across all operators.

Known Limitations

  • State size increase
    • Up to a couple of GB per task (especially painful on IO bound clusters)
    • Depends on the actual policy used (probably UNALIGNED_WITH_MAX_INFLIGHT_DATA  is the more plausible default)
  • Longer and heavier recovery depending on the increased state size
    • Can potentially trigger death spiral after a first failure
    • More up-to-date checkpoints will most likely still be an improvement about the current checkpoint behavior during backpressure

...

  • Correctness tests with induced failures
  • Compare checkpoints times under backpressure with current state
  • Compare throughput under backpressure with current state
  • Compare progress under backpressure with frequently induced failures
  • Compare throughput with no checkpointing

Rejected Alternatives

There were several alternatives for specific design decisions:

Support of rescaling in non-keyed operators:

...

For the first implementation of the persistence layer of in-flight data:

...

Roadmap

There are two primary goals. First, we want to provide a minimal viable product (MVP) that breaks up the vicious cycle of overload and instabilities of a cluster with slow checkpoints and more accumulated load resulting from recoveries from outdated checkpoints. Second, we aim for the full FLIP implementation that will among other things allow rescaling from unaligned checkpoints to directly counter overload.

In particular, the following improvements will be achieved through full release over MVP:

  • Re-scaling on unaligned checkpoints (need to use savepoint in MVP)
  • Incremental checkpointing to not write the same buffer multiple times (when the job is backpressured)
  • Advanced triggers for unaligned checkpoints such as timeouts on alignment or meeting the maximum threshold of checkpoint sizes
  • Support for concurrent checkpoints
  • Incremental loading and processing of state
  • No additional memory to load channel state: ideally, existing network buffers should be reused
  • Reduced number of files: single file could be reused for several checkpoints

We aim to make unaligned checkpoints the default behavior after the full implementation.