Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: recovery & rescaling section

...

Note, that during downscaling the size limit of “max inflight data” might be temporarily exceeded during recovery (see recovery section).

Recover from the checkpoint with guaranteed progress.

Recovery

Assignment of state and mapping it to in-memory data structures (without rescaling)

  • Channel state consists of upstream and downstream parts; they are recovered by upstream/downstream subtasks separately
  • Each part (InputChannel/SubPartition) is represented as StateHandle and included in the snapshot
  • Each part in metadata has subtask id and target or source subtask id
  • On recovery, checkpoint coordinator distributes state according to subtask index
    1. Without rescaling, we don’t need to do anything to colocate operator and channel state (state placement isn't changed)
    2. Raw state is distributed in the same manner
  • Subtasks place state into proper InputChannel/SubPartitions according to target/source id

Rescaling

Non-keyed

To place a state into a task CheckpointCoordinator uses modulo operation. For example, when scaling out:

  1. Upstream: new_par_level % old_src_task_index
  2. Downstream: new_par_level % old_dst_task_index

Subtask uses the same logic to determine the right InputChannel/SubPartition.

To match multi-buffer records, both upstream and downstream must sort records in the same order

  • i.e., if we have same some channel state on upstream, and channel state on downstream corresponds to it (as ensured by step 4); then it’s enough to sort them by channel id ([src_id : dst_id]); this will ensure order of records
  • there is a (technical?) problem with the proposed solution to match MBRs: we need to alternate load channel and process network operations which can be tricky
  • another solution would be to have temporary "virtual" channels to process data from "imported" channels

Keyed

For each key group, we need to place channel and operator state on the same subtask.

For output buffers, we know (at least conceptually) the keys, so we can store state as KeyedStateHandle and then reuse operator state redistribution code.

For input buffers, we only know a set of groups - the groups assigned to this task.

But, given that placement of key groups among the tasks is deterministic, we can: 1) compute old and new placements and 2) give each new task input buffers that could contain it’s key groups. 

The task then must filter out irrelevant records by keys (after deserialization). For non-MBRs it’s trivial.

For MBRs, if MBR doesn’t belong to this subtask, it will never receive remaining buffers and therefore can’t deserialize and filter it out.

Possible solutions:

  1. Send restored records from upstream to all downstreams that could need it (not according to key); we can use the same logic as in distribution for that
  2. Use sequence numbers: if we expect next part of MBR on this channel, but receive a buffer with a different SN; then discard this MBR
  3. Write key in the beginning of each buffer (need to ensure the key itself isn’t splitted)

Order of processing restored buffers

To recover from a checkpoint, the persistent channel needs to be consumed first before processing any additional inputs; that is, input buffers are first used to unspill data and only then can be used to process upstream data. Similarly, the output buffers are taken until the spilled data has been unspilled. Normal operation can be resumed even without all data being unspilled as long as no new input can overtake unspilled data.

...

The first, state-based storage will not satisfy all requirements. Nevertheless, assuming that the in-flight data is rather small, we should still quickly come to the point where new output is produced.

Rescaling of operators

When rescaling operators with in-flight data, we need to be careful to satisfy the ordering guarantees. 

If data is keyed, rescaling means that certain key groups are reassigned to new task instances. These task instances need to read the checkpoint of the former owner of the key group and filter a) relevant state (as is done already) and b) filter relevant data from the persisted data. 

Since rescaling is the rarest of the processes that touch the data (normal processing > checkpointing > recovery > rescaling), we opted for a non-optimized version of rescaling. When, after rescaling, two operator instances need to load data from the same state handle, they need to deserialize all records, apply the key selector, and receive the keygroup index to filter relevant records. We expect rather small amount of data to be processed multiple times and think that storing the keygroup index inside the data would impact performance for small records.

For non-keyed data, rescaling semantics is unfortunately a bit fuzzy. For this FLIP, we assume that no data of a given input split can overtake prior data in processing on forward channels. Any fan out or reshuffling will already destroy that ordering guarantee, so we can disregard these cases in this FLIP. If we focus on forward channels, however, we quickly run into situations where the ordering is violated (see Fig. 3).

...

.

...

Figure 3: Violating ordering guarantees while rescaling of non-keyed data. Split S1 is moved from source instance I to instance II.
If the original operators are back-pressured (⏮), data from the new source instance can overtake the original data.

To solve these issues, we have three solutions:

  • Unaligned checkpoints on non-keyed, forwarded operators are disabled by default. If we do not have in-flight data, we arrive at the status quo.
  • We add an explicit toggle to force unaligned checkpoints on non-keyed operators for power users. That toggle enables unaligned checkpoints when ordering is not important or no rescaling from a checkpoint will occur.
  • We devise and implement FLIP-? (light-weight partition reassignment), which will provide a very fine-grain mechanism to transfer the ownership of input splits/key groups. Based on this mechanism, we can fully support unaligned checkpoints on non-keyed operators.

Problem dimensions: keyed/non-keyed, scale-in/out, upstream/downstream

Possible issues: multi-buffer records, multi-record buffers, ordering (new/old data), pressure

General algorithm:

...

  1. what are the requirements? can we use any distribution as long as we can satisfy (3) (find correct channels)?

...

  1. after scaling out, state file can contain irrelevant records

...

  1. this should resolve MBR issues
  2. solution: task IDs?

...

Open questions

Avoiding double processing in downstream (if continuous spilling)

...