...
For simplicity, we assume the Flink job uses default configuration values for all configurations (e.g. scheduler-mode) except execution.runtime-mode. And the operator reads records from a source which switches from isBacklog=true to isBacklog=false before it reaches EOF.
NOTE: the following description of mixed mode assumes execution.interval-during-backlog == 0
1) Scheduling strategy
Batch mode:
...
- Source operator emits watermark based on the user-specified WatermarkStrategy and pipeline.auto-watermark-interval while it has not reached EOF.
- Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records. That means the downstream operator will not see any further input records after having received watermark=Long.MAX_VALUE.
Mixed mode with execution.interval-during-backlog > 0:
- Same as stream mode.
Mixed mode with execution.interval-during-backlog = 0:
- Source operator does not emit watermark while isBacklog=true.
- Before source operator emits isBacklog=false, it emits watermark of Long.MAX_VALUE for one key at a time after it has emitted all records for that key.
- At the point when isBacklog switches to false, source operator emits RecordAttributes(isBacklog=false).
- Source operator emits watermark based on the user-specified WatermarkStrategy and pipeline.auto-watermark-interval while it has not reached EOF.
- Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records.
...
- Every operator needs to support checkpoint. Checkpoint is triggered periodically according to execution.checkpointing.interval.
- If any task fails, its pipelined region is restarted to re-process its input since the last successful checkpoint.
Mixed mode with execution.interval-during-backlog > 0:
- Same as stream mode.
Mixed mode with execution.interval-during-backlog = 0:
- Every operator needs to support checkpoint.
- Before source operator emits isBacklog=false, checkpoint triggering is disabled.
- If any task fails when isBacklog=true, this task is restarted to re-process its input from the beginning.
- At the point when isBacklog switches to false, source operator emits RecordAttributes(isBacklog=false) and triggers an immediate checkpoint.
- Checkpoint is triggered periodically according to execution.checkpointing.interval.
- If any task fails when isBacklog=false, its pipelined region is restarted to re-process its input since the last successful checkpoint.
...
- Managed memory is allocated to operators if the keyed state backend is a subclass of AbstractManagedMemoryStateBackend.
- A general purpose keyed state backend which does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is used.
Mixed mode with execution.interval-during-backlog > 0:
- Same as stream mode.
Mixed mode with execution.interval-during-backlog = 0:
- Managed memory is allocated to operators with keyed inputs which claims isInternalSorterSupported == true.
- A general purpose keyed state backend which does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is instantiated by Flink runtime.
- Before source operator emits isBacklog=false, a downstream operator sorts input records by keys internally without accessing state backend. We will provide utility classes (derived from the existing ExternalSorter) to sort inputs.
- At the point when isBacklog switches to false:
- Source operator emits RecordAttributes(isBacklog=false)
- Downstream operator will process/aggregate all the records (received before RecordAttributes(isBacklog=false)) in order of the sorted keys, using a BatchExecutionKeyedStateBackend which it has explicitly instantiated internally.
- Downstream operator moves the each (key, state) pair from BatchExecutionKeyedStateBackend to the keyed state backend instantiated by Flink runtime once it sees a record coming in with a different key. - The operator continues to process records using the keyed state backend instantiated by Flink runtime, which now contains the full state obtained from records received while isBacklog=true.
...