Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

8) Hybrid shuffle can be applied by configuring ExecutionOptions.BATCH_SHUFFLE_MODE to determine the result partition type of isOutputOnEOF operator and its upstream operators in RuntimeExecutionMode.STREAMING.

Benchmark results

Use DataStream#coGroup to process records from two bounded streams with Data number of records and emit results after both inputs have ended. With this PR, when DataStream#coGroup is invoked with EndOfStreamWindows as the window assigner, EOFAggregationOperator will be applied to sort two inputs first, and then output the coGroup results.

Code Block
languagejava
data1.coGroup(data2)
    .where(tuple -> tuple.f0)
    .equalTo(tuple -> tuple.f0)
    .window(EndOfStreamWindows.get())
    .apply(new CustomCoGroupFunction())
    .addSink(...);

Environment Configuration

We run benchmarks on a mackbook with the latest Flink 1.18-snapshot and parallelism=1. Default standalone cluster configuration is used except:


Code Block
languageyml
jobmanager.memory.process.size: 6400m
taskmanager.memory.process.size: 6912m
taskmanager.numberOfTaskSlots: 2

Benchmark: OperatorDelayedDeploy

We have a job that pre-processes records from a bounded source (Source1) using an operator (Process1) which only emits results after its input has ended. Then anther operator(Process2) needs to process records emitted by Process1 with records from an unbounded source, and emit results with low processing latency in real-time.

Code Block
languagejava
 source1.keyBy(value -> value.f0)
  .window(EndOfStreamWindows.get())
  .aggregate(new MyAggregator()).name("Process1")
  .connect(source2.keyBy(value -> value.f0))
  .transform("Process2", Types.INT, new MyProcessOperator())
  .addSink(new DiscardingSink<>()); 

Resource Utilization

When we set the Source1Process1, Source2 and Process2 in 4 different SlotSharingGroups as above and then run this job, we can see that, before this PR, all of the tasks couldn't deploy and blocked in CREATED state because only 2 slots provided in TaskManager. With our work, Source2 and Process2 can be deplyed after Source1Process1 finished and released their slots.

OperatorDelayedDeployDataStream

Efficiency Improvement

Without setting the slot sharing group, we run OperatorDelayedDeployDataStreamInOneSlot each benchmark 5 times, and average throughput(record / ms), throughput range and the magnification to the minimum throughput are provided. Here are the benchmark results:

Data CountKey CountCurrent ThroughputOptimized ThroughputHybrid Shuffle Throughput
5e65e4205 ± 4 (149%)1201 ± 59 (876%)1381 ± 14 (1008%)
5e65e5145 ± 21 (105%)1093 ± 61 (797%)1282 ± 41 (935%)
1e71e5186 ± 19 (135%)1270 ± 64 (927%)1423 ± 41 (1038%)
1e71e6137 ± 10 (100%)1176 ± 31 (858%)1313 ± 33 (958%)
5e75e5OOM1463 ± 7 (1067%)1666 ± 35 (1216%)
5e75e6OOM1099 ± 28 (802%)1222 ± 49 (891%)

This shows that, with the changes proposed above, the total processing time can be about 8~10X faster than its current processing time in this scene. As the records number grows, job in current version causes OOM exception while in optimized version it works well. With hybrid shuffle applied in the batch execution part, the blocking edge from Source1 to Process1 changes to hybrid shuffle edge to improve the performance.

Benchmark: CoGroup

Use DataStream#coGroup to process records from two bounded streams with Data number of records and emit results after both inputs have ended. With this PR, when DataStream#coGroup is invoked with EndOfStreamWindows as the window assigner, EOFAggregationOperator will be applied to sort two inputs first, and then output the coGroup results.

Code Block
languagejava
data1.coGroup(data2)
    .where(tuple -> tuple.f0)
    .equalTo(tuple -> tuple.f0)
    .window(EndOfStreamWindows.get())
    .apply(new CustomCoGroupFunction())
    .addSink(...);


We run each benchmark in CoGroupDataStream 5 times, here are the throughput benchmark results:

Data CountKey CountCurrentBATCH OptimizedHybrid Shuffle
2e62e661 ± 1 (115%)502 ± 75 (947%)868 ± 74 (1637%)899 ± 82 (1696%)
5e65e660 ± 1 (113%)528 ± 28 (996%)1176 ± 39 (2218%)1262 ± 50 (2381%)
2e72e756 ± 0 (105%)564 ± 34 (1064%)1492 ± 28 (2815%)1692 ± 14 (3192%)
5e75e753 ± 1 (100%)602 ± 16 (1135%)1508 ± 18 (2845%)1712 ± 77 (3230%)

This shows with the changes proposed above, DataStream#coGroup can be 15~30X faster than its current throughput in stream mode, and 2.5X faster than its current throughput in batch mode. Using hybrid shuffle in the optimized outputEOF part can further improve the performance.

Benchmark: Aggregate

Use DataStream#aggregate to process records from a bounded streams with Data number of records evenly distributed in Key Count number of keys, and emit results after input have ended.

Code Block
languagejava
data
      .keyBy(value -> value.f0)
      .window(EndOfStreamWindows.get())
      .aggregate(new Aggregator())
      .addSink(new CountingAndDiscardingSink());


We run benchmark in 10 records per key and 100 records per key in AggregationBenchmark, and each for 5 times, here are the throughput benchmark results:

10 records per key

Data CountKey CountCurrentBATCHOptimizedHybrid Shuffle
5e65e5164 ± 1 (145%)1277 ± 129 (1130%)1365 ± 79 (1207%)1418 ± 154 (1254%)
1e71e6151 ± 2 (133%)1403 ± 87 (1241%)1511 ± 43 (1337%)1697 ± 175 (1501%)
2e72e6129 ± 4 (114%)1322 ± 21 (1169%)1411 ± 17 (1248%)1598 ± 13 (1414%)
5e75e6113 ± 1 (100%)1282 ± 13 (1134%)1155 ± 13 (1022%)1256 ± 22 (1111%)

100 records per keyWe run each benchmark in CoGroupDataStream 5 times, here are the throughput benchmark results:

Data CountKey CountCurrentBATCHOptimizedHybrid Shuffle
2e65e62e65e461 201 ± 1 3 (115%114%)502 1454 ± 75 153 (947%826%)868 1514 ± 74 43 (1637%860%)899 1670 ± 82 29 (1696%948%)
5e61e75e61e560 207 ± 1 2 (113%117%)528 1522 ± 28 71 (996%864%)1176 1620 ± 39 44 (2218%920%)1262 1866 ± 50 187 (2381%1060%)
2e72e72e556 201 ± 0 3 (105%114%)564 1607 ± 34 9 (1064%913%)1492 1746 ± 28 31 (2815%992%)1692 2038 ± 14 155 (3192%1157%)
5e75e75e553 176 ± 1 (100%)602 1559 ± 16 6 (1135%885%)1508 1723 ± 18 22 (2845%978%)1712 1956 ± 77 56 (3230%1111%)

This shows with the changes proposed above, DataStream#coGroup DataStream#aggregate can be 15~30X 10X faster than its current throughput in stream mode, and 2.5X faster than its slightly faster to the current throughput in batch mode. Using hybrid shuffle in the optimized non-pipeline outputEOF part can further improve the performance.

...