Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • In mixed mode, the same shuffle strategy as stream mode will be used to set shuffle type between tasks.
  • Keyed Before source operator emits isBacklog=false, keyed input of one input operator will be automatically sorted during isBacklog=true if it doesn't sort the input internally.
    • Keyed inputs of multiple inputs operator are not automatically sorted. It can sort the inputs internally.
  • When isBacklog switches to false, the keyed inputs will not be sorted.


3) Watermark strategy

Batch mode:

...

  • Managed memory is allocated to operators with keyed inputs.

  • A general purpose keyed state backend which does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is instantiated by Flink runtime.

  • Before source operator emits isBacklog=false (during backlog processing), the keyed input to one input operator is sorted by the key. Operator can leverage the sorted input to optimize the performance.

    • Multiple input operator can buffer and sort the inputs internally
    • With the sorted input and the optimization introduced in FLIP-325, the state value is always updated in the memory during the processing of the records with the same keys.
  • At the point when isBacklog switches to false:
    • Source operator emits RecordAttributes(isBacklog=false).
  • The downstream operator should flush and process all the buffered inputs and update the keyed state backend to the latest value.The operator continues operator continues to process records using the keyed state backend instantiated by Flink runtime, which now contains the full state obtained from records received while isBacklog=true.


6) Timer Service

Batch mode:

  • The timer service keeps timers only for the current key, and it will fire timers only at the end of each key.
  • The value of InternalTimeService#getCurrentWatermark will be Watermark.MIN_VALUE when processing a key and will be set to Watermark.MAX_VALUE when firing triggers.

Stream mode:

  • The timer service keeps timers for all the keys, and it will fire timers based on the watermark.

Mixed mode with execution.checkpointing.interval-during-backlog > 0:

  • Same as stream mode.

Mixed mode with execution.checkpointing.interval-during-backlog = 0:

  • Before the source operator emits isBacklog=false, the timer service keeps timers only the current key, and it will fire timers up to the maximum watermark during the backlog at the end of each key.
  • At the point when isBacklog switches to false, the timer service keeps timers for all the keys and fires timers based on the watermark.

Public Interfaces

1) Add RecordAttributesBuilder and RecordAttributes that extends RuntimeEvent to provide operator with essential information about the records they receive, such as whether the records are already stale due to backlog.

...