...
Managed memory is allocated to operators with keyed inputs.
A general purpose keyed state backend that does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is instantiated by Flink runtime.
Before source operator emits isBacklog=false (during backlog processing), the keyed input to one input operator is sorted by the key.
- Multiple input operators can buffer and sort the inputs internally. We will provide utility classes (derived from the existing ExternalSorter) to sort inputs.
- With the sorted input and the state backend optimization introduced in FLIP-325, there will be at most one read access to the state value is always updated in the memory during the processing of the records with the same keys, except for the initial read and the final writeRocksdb state backend for each key. And we only need to write to the Rocksdb state backend if the lru cache is full.
- Multiple input operators can buffer and sort the inputs internally. We will provide utility classes (derived from the existing ExternalSorter) to sort inputs.
- At the point when isBacklog switches to false:
- Source operator emits RecordAttributes(isBacklog=false).
- The operator continues to process records using the keyed state backend instantiated by Flink runtime, which now contains the full state obtained from records received while isBacklog=true.
...
- The timer service keeps timers only for the current key, and it will fire timers only at the end of each key.
- The value of
InternalTimeService#getCurrentWatermark
will beWatermark.MIN_VALUE
when processing a key and will be set toWatermark.MAX_VALUE
when when firing triggers. This means that theInternalTimeService#getCurrentWatermark
returns Watermark.MIN_VALUE inInput#processElement
and Watermark.MAX_VALUE inTriggerable#onEventTime
Stream mode:
- The timer service keeps timers for all the keys, and it will fire timers based on the watermark.
...
- Before the source operator emits isBacklog=false, the timer service keeps timers only for the current key, and it will fire timers of the current key up to the maximum last watermark of the during backlog at the end of each key. And all the processing time timer up to the current system time will be triggered.
- The value of
InternalTimeService#getCurrentWatermark
will beWatermark.MIN_VALUE
when processing a key and will be set to the maximum last watermark of the during backlog when firing triggers.
- The value of
- After isBacklog switches to false, the timer service keeps timers for all the keys and fires timers based on as the watermark advances.
Public Interfaces
1) Add RecordAttributesBuilder and RecordAttributes that extends RuntimeEvent to provide operator with essential information about the records they receive, such as whether the records are already stale due to backlog.
...
- If a OneInputTransformation has isInternalSorterSupported == false, keyed input, and execution.checkpointing.interval-during-backlog == 0:
- Flink runtime will add a an external sorter for the input to sort the keyed input during backlog processing
- Its managed memory should be set according to execution.sorted-inputs.memory
5) For In addition to the above change, we will utilize the state backend optimization in FLIP-325, where we will use a StateBackend with cache size 1 if the cache size is not explicitly set by the users or use the cache size set by the users. The performance of keyed one input operator , its performance should be improved considerably without code change when isBacklog=true, given the automatically sorted input and the state backend optimization in FLIP-325, such as KeyedProcessOperator, StreamGroupedReduceOperator, WindowOperator, etc. And the performance of the operator during isBacklog=true is close to the performance in batch mode.
...
Currently, there are two keyed multi-input operators in DataStream API, KeyedCoProcessOperator and IntervalJoinOperator. Both of them can will be optimized in the following way:
- Override getOperatorAttributes to return IsInternalSorterSupported = true.
- Override processRecordAttributes() API to adjust its behavior based on the input's isBacklog status.
- The operator can buffer and sort input records when any input has isBacklog=true AND execution.checkpointing.interval-during-backlog=0.
- Once all inputs' status has switched to isBacklog=false, it processes the buffered records, emits results, and starts to work in the stream execution mode.
Evaluation
After the proposed changes, the following DataStream API should have similar performance as batch mode during backlog processing.
DataStream:
- DataStream#coGroup
KeyedStream:
- KeyedStream#process
- KeyedStream#intervalJoin
- KeyedStream#reduce
- KeyedStream#sum
- KeyedStream#min
- KeyedStream#max
- KeyedStream#minBy
- KeyedStream#maxBy
WindowedStream:
- WindowedStream#reduce
- WindowedStream#aggregate
- WindowedStream#apply
- WindowedStream#process
- WindowedStream#sum
- WindowedStream#min
- WindowedStream#max
- WindowedStream#minBy
- WindowedStream#maxBy
ConnectedStreams:
- ConnectedStreams#process
Benchmark results
In this section, we provide benchmark results to compare the throughput between batch mode and stream mode with and without backlog processing optimization for commonly-used operations during backlog processing. This will demonstrate the performance we can gain with the optimization during backlog processing.
...