Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

[This FLIP proposal is a joint work between Xuannan Su  and Dong Lin ]

Table of Contents




Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

[This FLIP proposal is a joint work between Xuannan Su  and Dong Lin ]

Table of Contents


Currently, Flink allows the same operator to Currently, Flink allows the same operator to be used in both stream mode and batch mode. The operator can process bounded stream of records with high throughput (e.g. via sorting/buffering) in a batch-mode job, and process unbounded stream of records with low processing latency (e.g. via checkpoint with stateful state backend) in a batch-mode job. However, the operator is not able to use different modes (i.e. batch and stream modes) at different stages of the same job, which makes it hard to meet the performance requirement for jobs that need to process a bounded stream of backlog data followed by an unbounded stream of fresh data.


In this section, we describe how batch mode and stream mode differ in a variety of aspects. We also describe the new behavior of Flink runtime which supports an operator to switch from batch mode to stream mode during execution of the same job.

For simplicityeasy of understanding, we assume the Flink use a simplified use-case with the following properties when describing the behavior changes:

  • The job uses default configuration values for all configurations (e.g. scheduler-mode) except execution.runtime-mode


  • (see below).
  • The job has one source that switches from isBacklog=true to isBacklog=false before it reaches EOF.

Here are the definition of batch mode, stream mode and mixed mode used below:


  • After this FLIP, the behavior of Flink runtime with execution.runtime-mode = streaming AND execution.checkpointing.interval-during-backlog > 0, will be same as the stream mode prior to this FLIP.
  • The rational for switching the Flink runtime behavior based on whether execution.checkpointing.interval-during-backlog == 0 is that most (if not all) performance optimizations, which allow batch mode to achieve higher throughput than stream mode, involve operations (e.g. sorting inputs) that can only be used when checkpoint is not required.
  • It is possible for mixed mode to be slower than stream mode, particularly when there is only small amount of input records and the overhead of buffering/sorting inputs out-weight its benefit. This is similar to how the merge join might be slower than hash join. This FLIP focuses on optimizing the Flink throughput when there is a high number of input records. In the future, we might introduce more strategies to turn on mix mode in a smart way to avoid performance regressionwhen there is a high number of input records. In the future, we might introduce more strategies to turn on mix mode in a smart way to avoid performance regression.
  • For an operator with 2+ inputs, where some inputs have isBacklog=true and some other inputs have isBacklog=false, Flink runtime will handle this operator as if all its inputs have isBacklog=false. The rational is that we don't have a reliable way (yet) to decide whether it is OK to delay the processing/emission of this operator's output records. For example, if this operator simply forwards the records from the input with backlog=false to its output, then we probably should not delay its output records.

1) Scheduling strategy

Batch mode:


  • Source operator does not emit watermark while isBacklog=true.
  • At the point when isBacklog switches to false, source operator emits RecordAttributes(isBacklog=false) and the greatest watermark during backlog processing.

  • Source operator emits watermark based on the user-specified WatermarkStrategy and while it has not reached EOF.

  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records.

4) Checkpoint and failover strategy

Batch mode:

  • -interval while it has not reached EOF.

  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records.

4) Checkpoint and failover strategy

Batch mode:

  • No operator needs to support checkpoint. Checkpoint triggering is disabled.
  • If any task fails, this task is restarted to re-process its input from the beginning.

Stream mode:

  • Every operator needs to support checkpoint. Checkpoint is triggered periodically according to execution.checkpointing.interval.
  • If any task fails, its pipelined region is restarted to re-process its input since the last successful checkpoint.

Mixed mode:

  • Every No operator needs to support checkpoint. Checkpoint
  • Before source operator emits isBacklog=false, checkpoint triggering is disabled.
  • If any task fails when isBacklog=true, this task is restarted to re-process its input from the beginning.

Stream mode:

  • At the point when isBacklog switches to false, source operator emits RecordAttributes(isBacklog=false) and triggers an immediate checkpoint.
  • Every operator needs to support checkpoint. Checkpoint is triggered periodically according to execution.checkpointing.interval.
  • If any task fails when isBacklog=false, its pipelined region is restarted to re-process its input since the last successful checkpoint.

Mixed mode:


Extra notes: For jobs with multiple sources and execution.checkpointing.interval-during-backlog = 0, checkpoint triggering is enabled if and only if all sources have isBacklog=false. (More details for the checkpointing behavior can be found in the doc of execution.checkpointing.interval-during-backlog).

As a result, suppose a source has isBacklog=true, and another source switches from isBacklog=true to isBacklog=false, the job's checkpoint would still be disabled


5) Keyed state backend

Batch mode:


  • Managed memory is allocated to operators with keyed inputs.

  • A general purpose keyed state backend that does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is instantiated by Flink runtime.

  • Before source operator emits isBacklog=false (during backlog processing), the keyed input to one input operator is sorted by the key.

    • Multiple input operators Operators with multiple inputs can buffer and sort the inputs internally. We will provide utility classes (derived from the existing ExternalSorter) to sort inputs.
    • With the sorted input and the state backend optimization introduced in FLIP-325Flink runtime will buffer/aggregate state's key/value in memory before persisting state's key/value to the underlying state backend (e.g. rocksdb), such that for the input records that are already sorted on the given key, there will be at most one read/write access to the Rocksdb underlying state backend for each key. And we only need to write to the Rocksdb state backend if the lru cache is full.
  • At the point when isBacklog switches to false:
    • Source operator emits RecordAttributes(isBacklog=false).
  • The operator continues to process records using the keyed state backend instantiated by Flink runtime, which now contains the full state obtained from records received while isBacklog=true.


  • Before source operator emits isBacklog=false, the timer service would fire processing-time timers based on the system time.
  • At the point when isBacklog switches to false, the timer service would fire event-time timers up to the latest watermark at this point.
    • The value of InternalTimeService#getCurrentWatermark will be Watermark.MIN_VALUE when processing a key and will be set to the last watermark during backlog when firing triggers.
  • After isBacklog switches to false, the timer service keeps timers for all the keys and fires timers as the watermark advancescontinues to fire processing-time and even-time timers in the same way as the stream mode.

Public Interfaces

1) Add RecordAttributesBuilder and RecordAttributes that extends RuntimeEvent to StreamElement to provide operator with essential information about the records they receive, such as whether the records are already stale due to backlog.


Code Block
 * A RecordAttributes element provides stream task with information that can be used to optimize
 * the stream task's performance.
public class RecordAttributes extends RuntimeEventStreamElement {
     * If it returns true, then the records received after this element are stale
     * and an operator can optionally buffer records until isBacklog=false. This
     * allows an operator to optimize throughput at the cost of processing latency.
     public Boolean isBacklog() {...}


  • If a OneInputTransformation has isInternalSorterSupported == false, keyed input, and execution.checkpointing.interval-during-backlog == 0:
    • Flink runtime will add an external sorter for the input to sort the keyed input during backlog processing
    • Its managed memory should be set according to execution.sorted-inputs.memoryUse
    • a state backend with cache introduced in FLIP-325 with a cache size of 1 if the cache size isn't explicitly specified by users so that we optimize the state backend access while minimizing the memory overhead. Alternatively, when users have set a cache size, we will adhere to that settingFlink runtime will buffer/aggregate state's key/value in memory before persisting state's key/value to the underlying state backend (e.g. rocksdb), such that for the input records that are already sorted on the given key, there will be at most one read/write access to the underlying state backend for each key.

With the above change, we can significantly enhance the performance of keyed one input operators without code change when "isBacklog=true". Moreover, the performance of the operator during isBacklog=true is close to the performance in batch mode.
