Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion threadhttps://lists.apache.org/thread/29nvjt9sgnzvs90browb8r6ng31dcs3n
Vote threadhttps://lists.apache.org/thread/7cj6pzx7w0ynqyogxgk668wjr322mcqw
JIRA

Jira
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-

a9e9-3523-82ec-879b028fb15b
keyFLINK-33202

JIRA

-

Release-

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

In this section, we describe how batch mode and stream mode differ in a variety of aspects. We also describe the new behavior of Flink runtime which supports an operator to switch from batch mode to stream mode during execution of the same job.


For simplicity, we assume the Flink easy of understanding, we use a simplified use-case with the following properties when describing the behavior changes:

  • The job uses default configuration values for all configurations (e.g. scheduler-mode) except execution.runtime-mode

...

  • (see below).
  • The job has one source that switches from isBacklog=true to isBacklog=false before it reaches EOF.


Here are the definition of batch mode, stream mode and mixed mode used below:

...

The capability to update/optimize scheduling (e.g. speculative execution, AQE) in mixed mode will be left to future work.


2) Shuffle strategy

Batch mode

  • All shuffle type will be set to BLOCKING.
  • All keyed input will be sorted before they are fed into operators. Managed memory is allocated to operators with keyed inputs to support sorting inputs.

Stream mode:

  • All shuffle type will be set to PIPELINED.

Mixed mode:

  • In mixed mode, the same shuffle strategy as stream mode will be used to set shuffle type between tasks.
  • Before source operator emits isBacklog=false, keyed input of one input operator will be automatically sorted during isBacklog=true if it doesn't sort the input internally.
    • Keyed inputs of multiple inputs operator are not automatically sorted. It can sort the inputs internally.
  • When isBacklog switches to false, the keyed inputs will not be sorted.

3) Watermark strategy

Batch mode:

  • Source operator does not emit watermark.

  • Once an operator reaches EOF, the operator triggers all the timers one key at a time.

Stream mode:

  • Source operator emits watermark based on the user-specified WatermarkStrategy and pipeline.auto-watermark-interval while it has not reached EOF.
  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records. That means the downstream operator will not see any further input records after having received watermark=Long.MAX_VALUE.

Mixed mode:

  • Source operator does not emit watermark while isBacklog=true.
  • At the point when isBacklog switches to false, source operator emits RecordAttributes(isBacklog=false) and the greatest watermark during backlog processing.

  • Source operator emits watermark based on the user-specified WatermarkStrategy and pipeline.auto-watermark-interval while it has not reached EOF.

  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records.

4) Checkpoint and failover strategy

Batch mode:

strategy

Batch mode

  • All shuffle type will be set to BLOCKING.
  • All keyed input will be sorted before they are fed into operators. Managed memory is allocated to operators with keyed inputs to support sorting inputs.

Stream mode:

  • All shuffle type will be set to PIPELINED.

Mixed mode:

  • In mixed mode, the same shuffle strategy as stream mode will be used to set shuffle type between tasks.
  • Before source operator emits isBacklog=false, keyed input of one input operator will be automatically sorted during isBacklog=true if it doesn't sort the input internally.
    • Keyed inputs of multiple inputs operator are not automatically sorted. It can sort the inputs internally.
  • When isBacklog switches to false, the keyed inputs will not be sorted.


3) Watermark strategy

Batch mode:

  • Source operator does not emit watermark.

  • Once an operator reaches EOF, the operator triggers all the timers one key at a time.

Stream mode:

  • Source operator emits watermark based on the user-specified WatermarkStrategy and pipeline.auto-watermark-interval while it has not reached EOF.
  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records. That means the downstream operator will not see any further input records after having received watermark=Long.MAX_VALUE.

Mixed mode:

  • Source operator does not emit watermark while isBacklog=true.
  • At the point when isBacklog switches to false, source operator emits RecordAttributes(isBacklog=false) and the greatest watermark during backlog processing.

  • Source operator emits watermark based on the user-specified WatermarkStrategy and pipeline.auto-watermark-interval while it has not reached EOF.

  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records.


4) Checkpoint and failover strategy

Batch mode:

  • No operator needs to support checkpoint. Checkpoint triggering is disabled.
  • If any task fails, this task is restarted to re-process its input from the beginning.

Stream mode:

  • Every operator needs to support checkpoint. Checkpoint is triggered periodically according to execution.checkpointing.interval.
  • If any task fails, its pipelined region is restarted to re-process its input since the last successful checkpoint.

Mixed mode:

  • Every operator needs to support checkpoint.
  • Before source operator emits isBacklog=false, checkpoint No operator needs to support checkpoint. Checkpoint triggering is disabled.
  • If any task fails when isBacklog=true, this task is restarted to re-process its input from the beginning.

Stream mode:

  • At the point when isBacklog switches to false, source operator emits RecordAttributes(isBacklog=false) and triggers an immediate checkpoint.
  • Every operator needs to support checkpoint. Checkpoint is triggered periodically according to execution.checkpointing.interval.
  • If any task fails when isBacklog=false, its pipelined region is restarted to re-process its input since the last successful checkpoint.

Mixed mode:

...


Extra notes: For jobs with multiple sources and execution.checkpointing.interval-during-backlog = 0, checkpoint triggering is enabled if and only if all sources have isBacklog=false. (More details for the checkpointing behavior can be found in the doc of execution.checkpointing.interval-during-backlog).

As a result, suppose a source has isBacklog=true, and another source switches from isBacklog=true to isBacklog=false, the job's checkpoint would still be disabled

...


5) Keyed state backend

Batch mode:

...

  • Managed memory is allocated to operators with keyed inputs.

  • A general purpose keyed state backend that does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is instantiated by Flink runtime.

  • Before source operator emits isBacklog=false (during backlog processing), the keyed input to one input operator is sorted by the key.

    • Multiple input operators Operators with multiple inputs can buffer and sort the inputs internally. We will provide utility classes (derived from the existing ExternalSorter) to sort inputs.
      inputs.
    • Flink runtime will buffer/aggregate state's key/value in memory before persisting state's key/value to the underlying state backend (e.g. rocksdb), such that for the input records that are already sorted on the given keyWith the sorted input and the state backend optimization introduced in FLIP-325, there will be at most one read/write access to the Rocksdb underlying state backend for each key. And we only need to write to the Rocksdb state backend if the lru cache is full.
  • At the point when isBacklog switches to false:
    • Source operator emits RecordAttributes(isBacklog=false).
  • The operator continues to process records using the keyed state backend instantiated by Flink runtime, which now contains the full state obtained from records received while isBacklog=true.

...

1) Add RecordAttributesBuilder and RecordAttributes that extends RuntimeEvent to StreamElement to provide operator with essential information about the records they receive, such as whether the records are already stale due to backlog.

...

Code Block
languagejava
/**
 * A RecordAttributes element provides stream task with information that can be used to optimize
 * the stream task's performance.
 */
@Experimental
public class RecordAttributes extends RuntimeEventStreamElement {
    /**
     * If it returns true, then the records received after this element are stale
     * and an operator can optionally buffer records until isBacklog=false. This
     * allows an operator to optimize throughput at the cost of processing latency.
     */
     @Nullable
     public Boolean isBacklog() {...}
}

...

  • If a OneInputTransformation has isInternalSorterSupported == false, keyed input, and execution.checkpointing.interval-during-backlog == 0:
    • Flink runtime will add an external sorter for the input to sort the keyed input during backlog processing
    • Its managed memory should be set according to execution.sorted-inputs.memoryUse
    • a state backend with cache introduced in FLIP-325 with a cache size of 1 if the cache size isn't explicitly specified by users so that we optimize the state backend access while minimizing the memory overhead. Alternatively, when users have set a cache size, we will adhere to that settingFlink runtime will buffer/aggregate state's key/value in memory before persisting state's key/value to the underlying state backend (e.g. rocksdb), such that for the input records that are already sorted on the given key, there will be at most one read/write access to the underlying state backend for each key.

With the above change, we can significantly enhance the performance of keyed one input operators without code change when "isBacklog=true". Moreover, the performance of the operator during isBacklog=true is close to the performance in batch mode.

...