...
In this FLIP, we propose to optimize performance for the above use-cases by allowing an operator to effectively switch its execution mode from batch to stream based on "backlog" status of the records emitted by the source operators.
NOTE: some features originally included in this FLIP have been moved to FLIP-331.NOTE: this FLIP depends on the APIs (e.g. RecordAttributes with isBacklog information) proposed in FLIP-325. While FLIP-325 allows an operator to buffer records for arbitrary long while its input records have isBacklog=true, this FLIP additionally allows an operator to use optimizations that is currently only applicable in the batch execution mode (e.g. using ExternalSorter which does not support checkpoint).
...
- Every operator needs to support checkpoint.
- Before source operator emits isBacklog=false, checkpoint triggering is disabled.
- If any task fails when isBacklog=falsetrue, this task is restarted to re-process its input from the beginning.
- At the point when isBacklog switches to false, source operator emits RecordAttributes(isBacklog=false) and triggers an immediate checkpoint.
- Checkpoint is triggered periodically according to execution.checkpointing.interval.
- If any task fails when isBacklog=truefalse, its pipelined region is restarted to re-process its input since the last successful checkpoint.
...