Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In this FLIP, we propose to optimize performance for the above use-cases by allowing an operator to effectively switch its execution mode from batch to stream based on "backlog" status of the records emitted by the source operators.


NOTE: some features originally included in this FLIP have been moved to FLIP-331.NOTE: this FLIP depends on the APIs (e.g. RecordAttributes with isBacklog information) proposed in FLIP-325. While FLIP-325 allows an operator to buffer records for arbitrary long while its input records have isBacklog=true, this FLIP additionally allows an operator to use optimizations that is currently only applicable in the batch execution mode (e.g. using ExternalSorter which does not support checkpoint).

...

  • Every operator needs to support checkpoint.
  • Before source operator emits isBacklog=false, checkpoint triggering is disabled.
  • If any task fails when isBacklog=falsetrue, this task is restarted to re-process its input from the beginning.
  • At the point when isBacklog switches to false, source operator emits RecordAttributes(isBacklog=false) and triggers an immediate checkpoint.
  • Checkpoint is triggered periodically according to execution.checkpointing.interval.
  • If any task fails when isBacklog=truefalse, its pipelined region is restarted to re-process its input since the last successful checkpoint.

...