Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In this FLIP, we propose to optimize performance for the above use-cases by allowing an operator to effectively switch its execution mode from batch to stream based on "backlog" status of the records emitted by the source operators.


NOTE: some features originally included in this FLIP have been moved for FLIP-331.

NOTE: this FLIP depends on the APIs (e.g. RecordAttributes with isBacklog information) proposed in FLIP-325. While FLIP-325 allows an operator to buffer records for arbitrary long while its input records have isBacklog=true, this FLIP additionally allows an operator to use optimizations that is currently only applicable in the batch execution mode (e.g. using ExternalSorter which does not support checkpoint).

...

1) Use DataStream#coGroup to process records from two bounded streams and emit results after both inputs have ended.

We

...

use a user-defined function which just

...

invokes "collector.collect(1)" so that overhead of the user-defined function is minimized. The DataStream program looks like this.

Code Block
languagejava
data1.coGroup(data2)
     .where(tuple -> tuple.f0)
     .equalTo(tuple -> tuple.f0)
     .window(EndOfStreamWindows.get())
     .apply(new CustomCoGroupFunction())
     .addSink(...);

...