Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For simplicity, we assume the Flink job uses default configuration values for all configurations (e.g. scheduler-mode) except execution.runtime-mode. And the operator reads records from a source which switches from isBacklog=true to isBacklog=false before it reaches EOF.

NOTE: the following description of mixed mode assumes execution.interval-during-backlog == 0


1) Scheduling strategy

Batch mode:

...

  • Source operator emits watermark based on the user-specified WatermarkStrategy and pipeline.auto-watermark-interval while it has not reached EOF.
  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records. That means the downstream operator will not see any further input records after having received watermark=Long.MAX_VALUE.

Mixed mode with execution.interval-during-backlog > 0:

  • Same as stream mode.

Mixed mode with execution.interval-during-backlog = 0:

  • Source operator does not emit watermark while isBacklog=true.
  • Before source operator emits isBacklog=false, it emits watermark of Long.MAX_VALUE for one key at a time after it has emitted all records for that key.
  • At the point when isBacklog switches to false, source operator emits RecordAttributes(isBacklog=false).
  • Source operator emits watermark based on the user-specified WatermarkStrategy and pipeline.auto-watermark-interval while it has not reached EOF.
  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records.

...

  • Every operator needs to support checkpoint. Checkpoint is triggered periodically according to execution.checkpointing.interval.
  • If any task fails, its pipelined region is restarted to re-process its input since the last successful checkpoint.

Mixed mode with execution.interval-during-backlog > 0:

  • Same as stream mode.

Mixed mode with execution.interval-during-backlog = 0:

  • Every operator needs to support checkpoint.
  • Before source operator emits isBacklog=false, checkpoint triggering is disabled.
  • If any task fails when isBacklog=true, this task is restarted to re-process its input from the beginning.
  • At the point when isBacklog switches to false, source operator emits RecordAttributes(isBacklog=false) and triggers an immediate checkpoint.
  • Checkpoint is triggered periodically according to execution.checkpointing.interval.
  • If any task fails when isBacklog=false, its pipelined region is restarted to re-process its input since the last successful checkpoint.

...

  • Managed memory is allocated to operators if the keyed state backend is a subclass of AbstractManagedMemoryStateBackend.
  • A general purpose keyed state backend which does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is used.

Mixed mode with execution.interval-during-backlog > 0:

  • Same as stream mode.

Mixed mode with execution.interval-during-backlog = 0:

  • Managed memory is allocated to operators with keyed inputs which claims isInternalSorterSupported == true.
  • A general purpose keyed state backend which does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is instantiated by Flink runtime.
  • Before source operator emits isBacklog=false, a downstream operator sorts input records by keys internally without accessing state backend. We will provide utility classes (derived from the existing ExternalSorter) to sort inputs.
  • At the point when isBacklog switches to false:
    - Source operator emits RecordAttributes(isBacklog=false)
    - Downstream operator will process/aggregate all the records (received before RecordAttributes(isBacklog=false)) in order of the sorted keys, using a BatchExecutionKeyedStateBackend which it has explicitly instantiated internally.
    - Downstream operator moves the each (key, state) pair from BatchExecutionKeyedStateBackend to the keyed state backend instantiated by Flink runtime once it sees a record coming in with a different key.
  • The operator continues to process records using the keyed state backend instantiated by Flink runtime, which now contains the full state obtained from records received while isBacklog=true.

...