...
As shown in the figure below, we might have a job that pre-processes records from a bounded source (i.e. inputA) using an operator (i.e. operatorA) which only emits results after its input has ended. Then operatorA operatorB needs to join records emitted by operatorA with records from an unbounded source, and emit results with low processing latency in real-time.
Currently, supporting the above use-case requires all operators to be deployed at the start of the job. This approach wastes slot and memory resources because operatorB can not do any useful work until operatorA's input has ended. Even worse, operatorB might use a lot of disk space only to cache and spill records received from the unbounded source to its local disk while it is waiting for operatorA's output.
...
We run benchmarks on a mackbook with the latest Flink 1.18-snapshot and parallelism=1. Default standalone cluster configuration is used except:
Code Block | ||
---|---|---|
| ||
jobmanager.memory.process.size: 6400m taskmanager.memory.process.size: 6912m taskmanager.numberOfTaskSlots: 2 |
...
Benchmark: OperatorDelayedDeploy
We have a job that pre-processes records from a bounded source (Source1) using an operator (Process1) which only emits results after its input has ended. Then anther operator(Process2) needs to process records emitted by Process1 with records from an unbounded source, and emit results with low processing latency in real-time.
Code Block | ||
---|---|---|
| ||
source1.keyBy(value -> value.f0) .window(EndOfStreamWindows.get()) .aggregate(new MyAggregator()).name("Process1") .connect(source2.keyBy(value -> value.f0)) .transform("Process2", Types.INT, new MyProcessOperator()) .addSink(new DiscardingSink<>()); |
Resource Utilization
When we set the Source1
,Process1
, Source2
and Process2
in 4 different SlotSharingGroups as above and then run this job, we can see that, before this PR, all of the tasks couldn't deploy and blocked in CREATED
state because only 2 slots provided in TaskManager. With our work, Source2
and Process2
can be deplyed after Source1
,Process1
finished and released their slots.
Efficiency Improvement
Without setting the slot sharing group, we run each benchmark 5 times, and average throughput(record / ms), throughput range and the magnification to the minimum throughput are provided. Here are the benchmark results:
Data Count | Key Count | Current Throughput | Optimized Throughput | Hybrid Shuffle Throughput |
---|---|---|---|---|
5e6 | 5e4 | 205 ± 4 (149%) | 1201 ± 59 (876%) | 1381 ± 14 (1008%) |
5e6 | 5e5 | 145 ± 21 (105%) | 1093 ± 61 (797%) | 1282 ± 41 (935%) |
1e7 | 1e5 | 186 ± 19 (135%) | 1270 ± 64 (927%) | 1423 ± 41 (1038%) |
1e7 | 1e6 | 137 ± 10 (100%) | 1176 ± 31 (858%) | 1313 ± 33 (958%) |
5e7 | 5e5 | OOM | 1463 ± 7 (1067%) | 1666 ± 35 (1216%) |
5e7 | 5e6 | OOM | 1099 ± 28 (802%) | 1222 ± 49 (891%) |
This shows that, with the changes proposed above, the total processing time can be about 8~10X faster than its current processing time in this scene. As the records number grows, job in current version causes OOM exception while in optimized version it works well. With hybrid shuffle applied in the batch execution part, the blocking edge from Source1
to Process1
changes to hybrid shuffle edge to improve the performance.
Benchmark: CoGroup
Use DataStream#coGroup to process records from two bounded streams with Data
number of records and emit results after both inputs have ended. With this PR, when DataStream#coGroup is invoked with EndOfStreamWindows as the window assigner, EOFAggregationOperator
will be applied to sort two inputs first, and then output the coGroup results.
Code Block | ||
---|---|---|
| ||
data1.coGroup(data2) .where(tuple -> tuple.f0) .equalTo(tuple -> tuple.f0) .window(EndOfStreamWindows.get()) .apply(new CustomCoGroupFunction()) .addSink(...); |
We run each benchmark in 5 times, here are the throughput benchmark results:
Data Count | Key Count | Current | BATCH | Optimized | Hybrid Shuffle |
---|---|---|---|---|---|
2e6 | 2e6 | 61 ± 1 (115%) | 502 ± 75 (947%) | 868 ± 74 (1637%) | 899 ± 82 (1696%) |
5e6 | 5e6 | 60 ± 1 (113%) | 528 ± 28 (996%) | 1176 ± 39 (2218%) | 1262 ± 50 (2381%) |
2e7 | 2e7 | 56 ± 0 (105%) | 564 ± 34 (1064%) | 1492 ± 28 (2815%) | 1692 ± 14 (3192%) |
5e7 | 5e7 | 53 ± 1 (100%) | 602 ± 16 (1135%) | 1508 ± 18 (2845%) | 1712 ± 77 (3230%) |
This shows with the changes proposed above, DataStream#coGroup can be 15~30X faster than its current throughput in stream mode, and 2.5X faster than its current throughput in batch mode. Using hybrid shuffle in the optimized outputEOF part can further improve the performance.
Benchmark: Aggregate
Use DataStream#aggregate to process records from a bounded streams with Data
number of records evenly distributed in Key Count
number of keys, and emit results after input have ended.
Code Block | ||
---|---|---|
| ||
data .keyBy(value -> value.f0) .window(EndOfStreamWindows.get()) .aggregate(new Aggregator()) .addSink(new CountingAndDiscardingSink()); |
We run benchmark in 10 records per key and 100 records per key in , and each for 5 times, here are the throughput benchmark results:
10 records per key
Data Count | Key Count | Current | BATCH | Optimized | Hybrid Shuffle |
---|---|---|---|---|---|
5e6 | 5e5 | 164 ± 1 (145%) | 1277 ± 129 (1130%) | 1365 ± 79 (1207%) | 1418 ± 154 (1254%) |
1e7 | 1e6 | 151 ± 2 (133%) | 1403 ± 87 (1241%) | 1511 ± 43 (1337%) | 1697 ± 175 (1501%) |
2e7 | 2e6 | 129 ± 4 (114%) | 1322 ± 21 (1169%) | 1411 ± 17 (1248%) | 1598 ± 13 (1414%) |
5e7 | 5e6 | 113 ± 1 (100%) | 1282 ± 13 (1134%) | 1155 ± 13 (1022%) | 1256 ± 22 (1111%) |
100 records per key
Data Count | Key Count | Current | BATCH | Optimized | Hybrid Shuffle |
---|---|---|---|---|---|
5e6 | 5e4 | 201 ± 3 (114%) | 1454 ± 153 (826%) | 1514 ± 43 (860%) | 1670 ± 29 (948%) |
1e7 | 1e5 | 207 ± 2 (117%) | 1522 ± 71 (864%) | 1620 ± 44 (920%) | 1866 ± 187 (1060%) |
2e7 | 2e5 | 201 ± 3 (114%) | 1607 ± 9 (913%) | 1746 ± 31 (992%) | 2038 ± 155 (1157%) |
5e7 | 5e5 | 176 ± 1 (100%) | 1559 ± 6 (885%) | 1723 ± 22 (978%) | 1956 ± 56 (1111%) |
Compatibility, Deprecation, and Migration Plan
...