Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

8) Hybrid shuffle can be applied by configuring ExecutionOptions.BATCH_SHUFFLE_MODE to determine the result partition type of isOutputOnEOF operator and its upstream operators in RuntimeExecutionMode.STREAMING.

Benchmark results

...

Use DataStream#coGroup to process records from two bounded streams with Data number of records and emit results after both inputs have ended.

...

With this PR, when DataStream#coGroup is invoked with EndOfStreamWindows as the window assigner, EOFAggregationOperator will be applied to sort two inputs first, and then output the coGroup results.


Code Block
languagejava
data1.coGroup(data2)
       .where(tuple -> tuple.f0)
       .equalTo(tuple -> tuple.f0)
       .window(EndOfStreamWindows.get())
       .apply(new CustomCoGroupFunction())
       .addSink(...);

We run

...

Here are the benchmark results:

...

each benchmark in CoGroupDataStream 5 times, here are the throughput benchmark results:

Data Count

Key CountCurrentBATCH OptimizedHybrid Shuffle
2e62e661 ± 1 (115%)502 ± 75 (947%)868 ± 74 (1637%)899 ± 82 (1696%)
5e65e660 ± 1 (113%)528 ± 28 (996%)1176 ± 39 (2218%)1262 ± 50 (2381%)
2e72e756 ± 0 (105%)564 ± 34 (1064%)1492 ± 28 (2815%)1692 ± 14 (3192%)
5e75e753 ± 1 (100%)602 ± 16 (1135%)1508 ± 18 (2845%)1712 ± 77 (3230%)

...

This shows with the changes proposed above, DataStream#coGroup can be

...

15~30X faster than its current throughput in stream mode, and 2.5X faster than its current throughput in batch mode. Using hybrid shuffle in the optimized non-pipeline part can further improve the performance.

Compatibility, Deprecation, and Migration Plan

...