Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Managed memory is allocated to operators with keyed inputs.

  • A general purpose keyed state backend that does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is instantiated by Flink runtime.

  • Before source operator emits isBacklog=false (during backlog processing), the keyed input to one input operator is sorted by the key.

    • Multiple input operators can buffer and sort the inputs internally. We will provide utility classes (derived from the existing ExternalSorter) to sort inputs.
    • With the sorted input and the state backend optimization introduced in FLIP-325, there will be at most one read access to the state value is always updated in the memory during the processing of the records with the same keys, except for the initial read and the final writeRocksdb state backend for each key. And we only need to write to the Rocksdb state backend if the lru cache is full.
  • At the point when isBacklog switches to false:
    • Source operator emits RecordAttributes(isBacklog=false).
  • The operator continues to process records using the keyed state backend instantiated by Flink runtime, which now contains the full state obtained from records received while isBacklog=true.

...

  • The timer service keeps timers only for the current key, and it will fire timers only at the end of each key.
  • The value of InternalTimeService#getCurrentWatermark will be Watermark.MIN_VALUE when processing a key and will be set to Watermark.MAX_VALUE when when firing triggers. This means that the InternalTimeService#getCurrentWatermark returns Watermark.MIN_VALUE in Input#processElement and Watermark.MAX_VALUE in Triggerable#onEventTime 

Stream mode:

  • The timer service keeps timers for all the keys, and it will fire timers based on the watermark.

...

  • Before the source operator emits isBacklog=false, the timer service keeps timers only for the current key, and it will fire timers of the current key up to the maximum last watermark of the during backlog at the end of each key. And all the processing time timer up to the current system time will be triggered.
    • The value of InternalTimeService#getCurrentWatermark will be Watermark.MIN_VALUE when processing a key and will be set to the maximum last watermark of the during backlog when firing triggers.
  • After isBacklog switches to false, the timer service keeps timers for all the keys and fires timers based on as the watermark advances.

Public Interfaces

1) Add RecordAttributesBuilder and RecordAttributes that extends RuntimeEvent to provide operator with essential information about the records they receive, such as whether the records are already stale due to backlog.

...

  • If a OneInputTransformation has isInternalSorterSupported == false, keyed input, and execution.checkpointing.interval-during-backlog == 0:
    • Flink runtime will add a an external sorter for the input to sort the keyed input during backlog processing
    • Its managed memory should be set according to execution.sorted-inputs.memory

5) For In addition to the above change, we will utilize the state backend optimization in FLIP-325, where we will use a StateBackend with cache size 1 if the cache size is not explicitly set by the users or use the cache size set by the users. The performance of keyed one input operator , its performance should be improved considerably without code change when isBacklog=true, given the automatically sorted input and the state backend optimization in FLIP-325, such as KeyedProcessOperator, StreamGroupedReduceOperator, WindowOperator, etc. And the performance of the operator during isBacklog=true is close to the performance in batch mode.

...

Currently, there are two keyed multi-input operators in DataStream API, KeyedCoProcessOperator and IntervalJoinOperator. Both of them can will be optimized in the following way:

  • Override getOperatorAttributes to return IsInternalSorterSupported = true.
  • Override processRecordAttributes() API to adjust its behavior based on the input's isBacklog status.
    • The operator can buffer and sort input records when any input has isBacklog=true AND execution.checkpointing.interval-during-backlog=0.
    • Once all inputs' status has switched to isBacklog=false, it processes the buffered records, emits results, and starts to work in the stream execution mode.



Evaluation

After the proposed changes, the following DataStream API should have similar performance as batch mode during backlog processing.


DataStream:

  • DataStream#coGroup


KeyedStream:

  • KeyedStream#process
  • KeyedStream#intervalJoin
  • KeyedStream#reduce
  • KeyedStream#sum
  • KeyedStream#min
  • KeyedStream#max
  • KeyedStream#minBy
  • KeyedStream#maxBy


WindowedStream:

  • WindowedStream#reduce
  • WindowedStream#aggregate
  • WindowedStream#apply
  • WindowedStream#process
  • WindowedStream#sum
  • WindowedStream#min
  • WindowedStream#max
  • WindowedStream#minBy
  • WindowedStream#maxBy


ConnectedStreams:

  • ConnectedStreams#process


Benchmark results

In this section, we provide benchmark results to compare the throughput between batch mode and stream mode with and without backlog processing optimization for commonly-used operations during backlog processing. This will demonstrate the performance we can gain with the optimization during backlog processing.

...