Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

As shown in the figure below, we might have a job that pre-processes records from a bounded source (i.e. inputA) using an operator (i.e. operatorA) which only emits results after its input has ended. Then operatorA operatorB needs to join records emitted by operatorA with records from an unbounded source, and emit results with low processing latency in real-time.

Currently, supporting the above use-case requires all operators to be deployed at the start of the job. This approach wastes slot and memory resources because operatorB can not do any useful work until operatorA's input has ended. Even worse, operatorB might use a lot of disk space only to cache and spill records received from the unbounded source to its local disk while it is waiting for operatorA's output.

...

Environment Configuration

We run benchmarks on a mackbook with the latest Flink 1.18-snapshot and parallelism=1. Default standalone cluster configuration is used except:


Code Block
languageyml
jobmanager.memory.process.size: 6400m
taskmanager.memory.process.size: 6912m
taskmanager.numberOfTaskSlots: 2

...

Benchmark: OperatorDelayedDeploy

We have a job that pre-processes records from a bounded source (Source1) using an operator (Process1) which only emits results after its input has ended. Then anther operator(Process2) needs to process records emitted by Process1 with records from an unbounded source, and emit results with low processing latency in real-time.

Code Block
languagejava
 source1.keyBy(value -> value.f0)
  .window(EndOfStreamWindows.get())
  .aggregate(new MyAggregator()).name("Process1")
  .connect(source2.keyBy(value -> value.f0))
  .transform("Process2", Types.INT, new MyProcessOperator())
  .addSink(new DiscardingSink<>()); 

Resource Utilization

When we set the Source1Process1, Source2 and Process2 in 4 different SlotSharingGroups as above and then run this job, we can see that, before this PR, all of the tasks couldn't deploy and blocked in CREATED state because only 2 slots provided in TaskManager. With our work, Source2 and Process2 can be deplyed after Source1Process1 finished and released their slots.

OperatorDelayedDeployDataStream

Efficiency Improvement

Without setting the slot sharing group, we run OperatorDelayedDeployDataStreamInOneSlot each benchmark 5 times, and average throughput(record / ms), throughput range and the magnification to the minimum throughput are provided. Here are the benchmark results:

Data CountKey CountCurrent ThroughputOptimized ThroughputHybrid Shuffle Throughput
5e65e4205 ± 4 (149%)1201 ± 59 (876%)1381 ± 14 (1008%)
5e65e5145 ± 21 (105%)1093 ± 61 (797%)1282 ± 41 (935%)
1e71e5186 ± 19 (135%)1270 ± 64 (927%)1423 ± 41 (1038%)
1e71e6137 ± 10 (100%)1176 ± 31 (858%)1313 ± 33 (958%)
5e75e5OOM1463 ± 7 (1067%)1666 ± 35 (1216%)
5e75e6OOM1099 ± 28 (802%)1222 ± 49 (891%)

This shows that, with the changes proposed above, the total processing time can be about 8~10X faster than its current processing time in this scene. As the records number grows, job in current version causes OOM exception while in optimized version it works well. With hybrid shuffle applied in the batch execution part, the blocking edge from Source1 to Process1 changes to hybrid shuffle edge to improve the performance.

Benchmark: CoGroup

Use DataStream#coGroup to process records from two bounded streams with Data number of records and emit results after both inputs have ended. With this PR, when DataStream#coGroup is invoked with EndOfStreamWindows as the window assigner, EOFAggregationOperator will be applied to sort two inputs first, and then output the coGroup results.

Code Block
languagejava
data1.coGroup(data2)
    .where(tuple -> tuple.f0)
    .equalTo(tuple -> tuple.f0)
    .window(EndOfStreamWindows.get())
    .apply(new CustomCoGroupFunction())
    .addSink(...);


We run each benchmark in CoGroupDataStream 5 times, here are the throughput benchmark results:

Data CountKey CountCurrentBATCH OptimizedHybrid Shuffle
2e62e661 ± 1 (115%)502 ± 75 (947%)868 ± 74 (1637%)899 ± 82 (1696%)
5e65e660 ± 1 (113%)528 ± 28 (996%)1176 ± 39 (2218%)1262 ± 50 (2381%)
2e72e756 ± 0 (105%)564 ± 34 (1064%)1492 ± 28 (2815%)1692 ± 14 (3192%)
5e75e753 ± 1 (100%)602 ± 16 (1135%)1508 ± 18 (2845%)1712 ± 77 (3230%)

This shows with the changes proposed above, DataStream#coGroup can be 15~30X faster than its current throughput in stream mode, and 2.5X faster than its current throughput in batch mode. Using hybrid shuffle in the optimized outputEOF part can further improve the performance.

Benchmark: Aggregate

Use DataStream#aggregate to process records from a bounded streams with Data number of records evenly distributed in Key Count number of keys, and emit results after input have ended.

Code Block
languagejava
data
      .keyBy(value -> value.f0)
      .window(EndOfStreamWindows.get())
      .aggregate(new Aggregator())
      .addSink(new CountingAndDiscardingSink());


We run benchmark in 10 records per key and 100 records per key in AggregationBenchmark, and each for 5 times, here are the throughput benchmark results:

10 records per key

Data CountKey CountCurrentBATCHOptimizedHybrid Shuffle
5e65e5164 ± 1 (145%)1277 ± 129 (1130%)1365 ± 79 (1207%)1418 ± 154 (1254%)
1e71e6151 ± 2 (133%)1403 ± 87 (1241%)1511 ± 43 (1337%)1697 ± 175 (1501%)
2e72e6129 ± 4 (114%)1322 ± 21 (1169%)1411 ± 17 (1248%)1598 ± 13 (1414%)
5e75e6113 ± 1 (100%)1282 ± 13 (1134%)1155 ± 13 (1022%)1256 ± 22 (1111%)

100 records per key

Data CountKey CountCurrentBATCHOptimizedHybrid Shuffle
5e65e4201 ± 3 (114%)1454 ± 153 (826%)1514 ± 43 (860%)1670 ± 29 (948%)
1e71e5207 ± 2 (117%)1522 ± 71 (864%)1620 ± 44 (920%)1866 ± 187 (1060%)
2e72e5201 ± 3 (114%)1607 ± 9 (913%)1746 ± 31 (992%)2038 ± 155 (1157%)
5e75e5176 ± 1 (100%)1559 ± 6 (885%)1723 ± 22 (978%)1956 ± 56 (1111%)

This shows with the changes proposed above, DataStream#aggregate can be 10X faster than its current throughput in stream mode, and slightly faster to the current throughput in batch mode. Using hybrid shuffle in the optimized outputEOF part can further improve the performance.

Compatibility, Deprecation, and Migration Plan

...