Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Managed memory is allocated to operators with keyed inputs.

  • A general purpose keyed state backend which does not assume inputs sorted by key A general purpose keyed state backend that does not assume inputs sorted by key (e.g.  EmbeddedRocksDBStateBackend) is instantiated by Flink runtimeEmbeddedRocksDBStateBackend) is instantiated by Flink runtime.

  • Before source operator emits isBacklog=false (during backlog processing), the keyed input to one input operator is sorted by the key.

    • Multiple input operator operators can buffer and sort the inputs internally. We will provide utility classes (derived from the existing ExternalSorter) to sort inputs.
    • With the sorted input and the state backend optimization introduced in FLIP-325, the state value is always updated in the memory during the processing of the records with the same keys, except for the initial read and the final write.
  • At the point when isBacklog switches to false:
    • Source operator emits RecordAttributes(isBacklog=false).
  • The operator continues to process records using the keyed state backend instantiated by Flink runtime, which now contains the full state obtained from records received while isBacklog=true.

...

  • Before the source operator emits isBacklog=false, the timer service keeps timers only for the current key, and it will fire timers of the current key up to the maximum watermark during of the backlog at the end of each key.
    • The value of InternalTimeService#getCurrentWatermark will be Watermark.MIN_VALUE when processing a key and will be set to the maximum watermark of the backlog when firing triggers.
  • After isBacklog switches to At the point when isBacklog switches to false, the timer service keeps timers for all the keys and fires timers based on the watermark.

...

3) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.


4) Update JM Transformation translator to make use of the following operator attributes when compiling the Transformation graph into the JobGraph.

  • If a Transformation has IsInternalSorterSupported == true AND
    (execution.runtime-mode == BATCH OR execution.checkpointing.interval-during-backlog == 0):
    • Flink runtime will not add external sorter for its inputs (including during batch-mode with keyed inputs)
    • Its managed memory should be set according to execution.sorted-inputs.memory

5) For those operators whose throughput can be considerably improved with sorted, update it to take advantage of the sorted input when isBacklog=true.

Multiple input operators that involve aggregation operation (e.g. join, cogroup, aggregate) on keyed inputs can benefit from using an internal sorter.

  • If a OneInputTransformation has isInternalSorterSupported == false, keyed input, and execution.checkpointing.interval-during-backlog == 0:
    • Flink runtime will add a sorter for the input to sort the keyed input during backlog processing
    • Its managed memory should be set according to execution.sorted-inputs.memory


5) For keyed one input operator, its performance should be improved considerably without code change when isBacklog=true, given the automatically sorted input and the state backend optimization in FLIP-325, such as KeyedProcessOperator, StreamGroupedReduceOperator, WindowOperator, etc. And the performance of the operator during isBacklog=true is close to the performance in batch mode.


6) For keyed multi-input operators that involve aggregation operation (e.g. join, cogroup, aggregate), update it to take advantage of the sorted input when isBacklog=true.

Currently, there are two keyed multi-input operators in DataStream API, KeyedCoProcessOperator and IntervalJoinOperator. Both of them can be optimized in the following wayThough the concrete implementation may vary across operators, an operator typically needs the following changes for this optimization:

  • Override getOperatorAttributes to return IsInternalSorterSupported = true.
  • Override processRecordAttributes() API to API to adjust its behavior based on the input's isBacklog status.
      For example, the
      • The operator can buffer and sort input records when any input has isBacklog=true AND execution.checkpointing.interval-during-backlog=0.
      • Once all inputs' status has
      switch
      • switched to isBacklog=false, it processes the buffered records, emits results, and starts to work in the stream execution mode.


    Benchmark results

    In this section, we provide benchmark results to compare the throughput between batch mode and stream mode for commonly-used operations. We hope this can help us quantify the increase in throughput after we are able to use batch mode to process backlog records.

    ...