Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In this section, we describe how batch mode and stream mode differ in a variety of aspects. We also describe the new behavior needed for of Flink runtime which supports an operator to switch from batch mode to stream mode during execution of the same job.

For simplicity, we assume the Flink job uses default configuration values for all configurations (e.g. scheduler-mode) except execution.runtime-mode. And the operator reads records from a source which switches from isBacklog=true to isBacklog=false before it reaches EOF.isBacklog=true to isBacklog=false before it reaches EOF.

Here are the definition of batch mode, stream mode and mixed mode used below:

  • Batch mode refers to the behavior of Flink runtime prior to this FLIP with execution.runtime-mode = batch.
  • Stream mode refers to the behavior of Flink runtime prior to this FLIP with execution.runtime-mode = streaming.
  • Mixed mode refers to the behavior of Flink runtime after this FLIP with execution.runtime-mode = streaming AND execution.checkpointing.interval-during-backlog = 0.

Additionally, note that after this FLIP, the behavior of Flink runtime with execution.runtime-mode = streaming AND execution.checkpointing.interval-during-backlog > 0, will be same as the batch mode (prior to this FLIP). 


1) Scheduling strategy

Batch mode:

...

  • All tasks are deployed at the beginning of the job without waiting for upstream tasks to finish.

...

Mixed mode:

  • Same as stream mode

...

  • .

The capability to update/optimize scheduling (e.g. speculative execution, AQE) in mixed mode will be left to future work.

...

  • All shuffle type will be set to PIPELINED.

Mixed mode with execution.checkpointing.interval-during-backlog > 0:

  • Same as stream mode.
  • .

Mixed modeMixed mode with execution.checkpointing.interval-during-backlog = 0:

  • In mixed mode, the same shuffle strategy as stream mode will be used to set shuffle type between tasks.
  • Before source operator emits isBacklog=false, keyed input of one input operator will be automatically sorted during isBacklog=true if it doesn't sort the input internally.
    • Keyed inputs of multiple inputs operator are not automatically sorted. It can sort the inputs internally.
  • When isBacklog switches to false, the keyed inputs will not be sorted.

...

  • Source operator emits watermark based on the user-specified WatermarkStrategy and pipeline.auto-watermark-interval while it has not reached EOF.
  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records. That means the downstream operator will not see any further input records after having received watermark=Long.MAX_VALUE.

Mixed mode with execution.checkpointing.interval-during-backlog > 0:

...

Mixed mode with execution.checkpointing.interval-during-backlog = 0:

  • Source operator does not emit watermark while isBacklog=true.
  • At the point when isBacklog switches to false, source operator emits RecordAttributes(isBacklog=false) and the greatest watermark during backlog processing.

  • Source operator emits watermark based on the user-specified WatermarkStrategy and pipeline.auto-watermark-interval while it has not reached EOF.

  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records.

...

  • Every operator needs to support checkpoint. Checkpoint is triggered periodically according to execution.checkpointing.interval.
  • If any task fails, its pipelined region is restarted to re-process its input since the last successful checkpoint.

Mixed mode with execution.checkpointing.interval-during-backlog > 0:

  • Same as stream mode.
  • .

Mixed modeMixed mode with execution.checkpointing.interval-during-backlog = 0:

  • Every operator needs to support checkpoint.
  • Before source operator emits isBacklog=false, checkpoint triggering is disabled.
  • If any task fails when isBacklog=true, this task is restarted to re-process its input from the beginning.
  • At the point when isBacklog switches to false, source operator emits RecordAttributes(isBacklog=false) and triggers an immediate checkpoint.
  • Checkpoint is triggered periodically according to execution.checkpointing.interval.
  • If any task fails when isBacklog=false, its pipelined region is restarted to re-process its input since the last successful checkpoint.

...

  • Managed memory is allocated to operators if the keyed state backend is a subclass of AbstractManagedMemoryStateBackend.
  • A general purpose keyed state backend which does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is used.

Mixed mode with execution.checkpointing.interval-during-backlog > 0:

...

Mixed mode with execution.checkpointing.interval-during-backlog = 0:

  • Managed memory is allocated to operators with keyed inputs.

  • A general purpose keyed state backend that does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is instantiated by Flink runtime.

  • Before source operator emits isBacklog=false (during backlog processing), the keyed input to one input operator is sorted by the key.

    • Multiple input operators can buffer and sort the inputs internally. We will provide utility classes (derived from the existing ExternalSorter) to sort inputs.
    • With the sorted input and the state backend optimization introduced in FLIP-325, there will be at most one read access to the Rocksdb state backend for each key. And we only need to write to the Rocksdb state backend if the lru cache is full.
  • At the point when isBacklog switches to false:
    • Source operator emits RecordAttributes(isBacklog=false).
  • The operator continues to process records using the keyed state backend instantiated by Flink runtime, which now contains the full state obtained from records received while isBacklog=true.

...

  • The timer service keeps timers for all the keys, and it will fire timers based on the watermark.

Mixed mode with execution.checkpointing.interval-during-backlog > 0:

...

  • .

Mixed mode with execution.checkpointing.interval-during-backlog = 0:

  • Before the source operator emits isBacklog=false, the timer service keeps timers only for the current key, and it will fire timers of the current key up to the last watermark during backlog at the end of each key. And all the processing time timer up to the current system time will be triggered.
    • The value of InternalTimeService#getCurrentWatermark will be Watermark.MIN_VALUE when processing a key and will be set to the last watermark during backlog when firing triggers.
  • After isBacklog switches to false, the timer service keeps timers for all the keys and fires timers as the watermark advances.

...