Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Unaligned checkpoints on non-keyed, forwarded operators are disabled by default. If we do not have in-flight data, we arrive at the status quo.
  • We add an explicit toggle to force unaligned checkpoints on non-keyed operators for power users. That toggle enables unaligned checkpoints when ordering is not important or no rescaling from a checkpoint will occur.
  • We devise and implement FLIP-? (light-weight partition reassignment), which will provide a very fine-grain mechanism to transfer the ownership of input splits/key groups. Based on this mechanism, we can fully support unaligned checkpoints on non-keyed operators.

Open questions

Handling records spanning multiple buffers

A single record can end up in different tasks while checkpointing.

Therefore, 

  1. with fan-in topology, downstream needs to match incomplete record with the upstream; but this should be handled by having separate input channels already - (given we have stable channel ids) (need to check)
  2. output buffers need to be sent to the right channel; this is a problem if we rely on keys and don't have stable channels ids
  3. when scaling-in upstream,  buffers of incomplete records need to be matched
  4. when scaling-in downstream, there can be multiple incomplete records in input buffers (it can be a single input channel after recovery)

There are two options:

  1. recover from one logical channel atomically (output of upstream, input of downstream) and always on one (downstream?) side. Output of upstream should be blocked during recovery (my idea was negative credit while upstream holds more buffers than it should, which would be taken from upstream buffer pool)
  2. join records on-the-fly (sort by "channel-id" on sender and receiver; send a marker after all saved buffers of the channel were sent; this will allow downstream to switch to the next upstream (using credit-based flow control))

Retaining subtask indices between restarts

This is needed to maintain correspondence between upstream output buffers and downstream input buffers.

Looks like indices are assigned arbitrarily, so we need to use sub-task-id in SubPartition (and associate it with output buffers file).

Avoiding double processing in downstream (if continuous spilling)

With continuous spilling, upstream saves all its output which includes what was already processed/buffered by the downstream.

Possible solutions:

  1. downstream loads upstream output buffers file and filters out what it has already processed; files are named deterministically and indexed
  2. same, but offset in the file is calculated based on buffer size
  3. downstream communicates its offset to upstream during snapshotting
  4. downstream communicates its offset to upstream during recovery

Existing persistence mechanisms vs custom

Depends on:

  1. ad-hoc or cont. spilling (cont. spilling can require special handling because of higher volumes)
  2. single-task channel processing (isn't compatible with current implementation)

Decisions made

Ad-hoc vs continuous spilling

Avoiding double processing in downstream (if continuous spilling)

With continuous spilling, upstream saves all its output which includes what was already processed/buffered by the downstream.

Possible solutions:

  1. downstream loads upstream output buffers file and filters out what it has already processed; files are named deterministically and indexed
  2. same, but offset in the file is calculated based on buffer size
  3. downstream communicates its offset to upstream during snapshotting
  4. downstream communicates its offset to upstream during recovery

Existing persistence mechanisms vs custom

Depends on:

  1. ad-hoc or cont. spilling (cont. spilling can require special handling because of higher volumes)
  2. single-task channel processing (isn't compatible with current implementation)
  3. re-scaling support (now or in future)

Resolved questions

Retaining subtask indices between restarts

This is needed to maintain correspondence between upstream output buffers and downstream input buffers.

Flink always uses zero-based indexes to:
1. pick subpartition to consume and bind it to input channel (see TaskDeploymentDescriptorFactory#createInputGateDeploymentDescriptors)
2. pick subpartition to output (ChannelSelector)
3. distribute key groups (see KeyGroupRangeAssignment#computeOperatorIndexForKeyGroup, StateAssignmentOperation#assignStates, OperatorInstanceID)
and so on...

As long as job isn't rescaled, these indices are the same.

Handling records spanning multiple buffers

A single record can end up in different tasks while checkpointing.

Therefore, 

without rescaling

  1. with fan-in topology, downstream needs to match incomplete record with the upstream; but this should be handled by having separate input channels already
  2. output buffers need to be sent to the right channel - we rely on stable channels ids, so this is NOT a problem

with rescaling

  1. when scaling-in upstream,  buffers of incomplete records need to be matched
  2. when scaling-in downstream, there can be multiple incomplete records in input buffers (it can be a single input channel after recovery)

There are two options:

  1. recover from one logical channel atomically (output of upstream, input of downstream) and always on one (downstream?) side. Output of upstream should be blocked during recovery (my idea was negative credit while upstream holds more buffers than it should, which would be taken from upstream buffer pool)
  2. join records on-the-fly (sort by "channel-id" on sender and receiver; send a marker after all saved buffers of the channel were sent; this will allow downstream to switch to the next upstream (using credit-based flow control))

Decisions made

Ad-hoc vs continuous spilling

Ad-hoc POC showed much better efficiency so discard continuous spilling for now (2020-02-19).

Re-scaling

The first version can be without re-scaling support, at least for non-keyed exchanges (sources and non-sources).

However, we need to have a vision of how to implement it in the future.

(2020-02-19)Ad-hoc POC showed much better efficiency so discard continuous spilling for now (2020-02-19).

Whether to process in and out buffers of a single channel in a single task (downstream?) or not (upstream + downstream)

...

Tentatively chosen not to read upstream data in downstream because of efficiency (ready many small files) and easier implementation (2020-02-19).

Compatibility, Deprecation, and Migration Plan

...