Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In addition, after FLIP-327 is accepted, this operator should also have isInternalSorterSupported = true. This operator will sort the input before aggregate, and avoid invoking window actions, which is similar to '5)'.

Benchmark results

Generally, when operator(coGroup or aggregate) is invoked optimized with EndOfStreamWindows as the window assigner, the optimized operator EOFCoGroupOperator /EOFAggregationOperator will be applied, in which case the input(s) will be sorted first and then output the results after the input end of input. This makes the performance with our work can be mutiple times faster than before and slightly faster than batch mode because of avoiding window related work.Using hybrid shuffle in the outputEOF part can make part of operator pipeline consume records from upstream to further improve the performance.

To demonstrate our optimization improvements,we run each benchmark in different execution modes and configurations 5 times. Here are benchmark results which include average throughput (records/ms), throughput range, the magnification factor for each streaming job before our work and duration time(ms).

Environment Configuration

...

Code Block
languagejava
data1.coGroup(data2)
    .where(tuple -> tuple.f0)
    .equalTo(tuple -> tuple.f0)
    .window(EndOfStreamWindows.get())
    .apply(new CustomCoGroupFunction())
    .addSink(...);


In this benchmark, we run each job in CoGroupDataStream 5 times in 4 kinds of configuration: streaming mode, batch mode, optimized streaming mode after this PR and optimized streaming mode with hybrid shuffle after this PR. Here are the throughput benchmark results:

Data CountSTREAMINGBATCHOptimized STREAMINGWith hybrid shuffle
1e72e763 82 ± 1 (100%, 158245 241186 ms)532 422 ± 12 33 (844%514%, 18768 47370 ms)1359 1406 ± 41 6 (2157%1714%, 7357 14223 ms)1545 1602 ± 35 9 (2452%1953%, 6472 12477 ms)
3e75e760 75 ± 0 1 (100%, 497587 658373 ms)554 477 ± 23 12 (923%636%, 54112 104666 ms)1202 1452 ± 9 14 (2003%1936%, 24958 34427 ms)1334 1626 ± 17 49 (2223%2168%, 22485 30736 ms)
5e78e756 66 ± 1 (100%, 878518 1202426 ms)572 491 ± 5 (1021%743%, 87296 162731 ms)1163 1506 ± 2 10 (2076%2281%, 42961 53098 ms)1283 1677 ± 8 42 (2291%2540%, 38953 47687 ms)

This shows with the changes proposed above, DataStream#coGroup in Optimized STREAMING can be 20X 17~20X faster than STREAMING throughput and 2X 3X faster than BATCH throughput in batch mode. Using hybrid shuffle in the outputEOF part can further improve the performance about 10%.

...

Code Block
languagejava
data
      .keyBy(value -> value.f0)
      .window(EndOfStreamWindows.get())
      .aggregate(new Aggregator())
      .addSink(new CountingAndDiscardingSink());


In this benchmark, we run each job in AggregationBenchmark 5 times in 4 kinds of configuration: streaming mode, batch mode, optimized streaming mode after this PR and optimized streaming mode with hybrid shuffle after this PR. Here are the throughput benchmark results:

Data CountSTREAMINGBATCHOptimized STREAMINGWith hybrid shuffle
2e7188 ± 1 (100%, 106190 ms)1565 ± 15 (832%, 12775 ms)1671 ± 31 (888%, 11964 ms)1950 ± 8 (1037%, 10255 ms)
5e7171 ± 1 (100%, 290733 ms)1534 ± 13 (897%, 32588 ms)1712 ± 14 (1001%, 29201 ms)1988 ± 16 (1162%, 25149 ms)
8e7163 ± 0 (100%, 490478 ms)1561 ± 16 (957%, 51237 ms)1733 ± 9 (1063%, 46143 ms)1992 ± 15 (1222%, 40148 ms)

This shows with the changes proposed above, DataStream#aggregate in Optimized STREAMING can be 8~10X faster than STREAMING throughput and slightly faster to the BATCH. Using hybrid shuffle in the outputEOF part can further improve the performance about 15%.

Benchmark: OperatorDelayedDeploy

...

Code Block
languageyml
taskmanager.numberOfTaskSlots: 2

When we set the Source1Process1, Source2 and Process2 in 4 different SlotSharingGroups as above and then run this job, we could see that, before this PR, all of the tasks couldn't deploy and blocked in CREATED state because only 2 slots provided in TaskManager. With our work, Source2 and Process2 can be deplyed after Source1Process1 finished and released their slots.

OperatorDelayedDeployDataStream

Efficiency Improvement

Without setting the slot sharing group, we run OperatorDelayedDeployDataStreamInOneSlot each benchmark 5 times. Process1 will aggregate Data Count number of records from Source1, and every 100 records have the same key. Benchmark terminates after Process2 process Data Count number of records from Source2.

Here are the throughput benchmark results:

Data CountSTREAMINGOptimized STREAMINGWith hybrid shuffle
1e7183 ± 19 (100%, 54370 ms)1371 ± 43 (749%, 7289 ms)1568 ± 49 (856%, 6376 ms)
3e740 ± 10 (747765 ms)1461 ± 19 (20531 ms)1674 ± 41 (17912 ms)
5e7OOM1522 ± 11 (32843 ms)1750 ± 14 (28561 ms)

This shows that, aggregation optimization above can also be detected in this benchmark. In line 1e7, the magnificaiton is smaller than Benchmark: Aggregate because the stage of Process2 processing records from Source2 in STEAMING is more faster than Optimized STREAMING as records from Source2 already buffered.

As the records number grows, throughput in STREAMING gets decreased because of the buffer presure in Process2, and even causes OOM exception while it works well in Optimized STREAMING which makes the magnification meaningless,. Using hybrid shuffle can make Process1 to pipeline consume records from Source1 to further improve the performance.

Compatibility, Deprecation, and Migration Plan

...