...
- If a Transformation has isOutputOnEOF == true:
- The results of this operator as well as its upstream operators have blocking (by default) or h
ExecutionOptions.BATCH_SHUFFLE_MODE.
- This operator as well as its upstream operators will be executed in batch mode (e.g checkpoint is disabled when these operators are running).
- The results of this operator as well as its upstream operators have blocking (by default) or h
- If all Transformation has isOutputOnCheckpoint == false:
- In FLIP-325, JM will not trigger an extra flush() before triggering a checkpoint.
...
In addition, after FLIP-327 is accepted, this operator should also have isInternalSorterSupported = true. This operator will sort the input before aggregate, and avoid invoking window actions, which is similar to '5)'.
7) Before all of non-pipeline tasks finished, CheckpointCoordinator would get a null checkpoint plan, and it can work normally after all of non-pipeline tasks finish.
8) ExecutionOptions.BATCH_SHUFFLE_MODE
Benchmark results
...
Benchmark results
Environment Configuration
Code Block | ||
---|---|---|
| ||
jobmanager.memory.process.size: 6400m
taskmanager.memory.process.size: 6912m
taskmanager.numberOfTaskSlots: 2 |
...
Use DataStream#coGroup to process records from two bounded streams with Data Count
number of records and emit results after both inputs have ended. With this PR, when DataStream#coGroup is invoked with EndOfStreamWindows as the window assigner, EOFCoGroupOperator
Code Block | ||
---|---|---|
| ||
source1.keyBy(valuedata1.coGroup(data2) .where(tuple -> valuetuple.f0) .window(EndOfStreamWindows.get()) .aggregate(new MyAggregator()).name("Process1") equalTo(tuple -> tuple.f0) .connectwindow(source2EndOfStreamWindows.keyBy(value -> value.f0get()) .transform("Process2", Types.INT, apply(new MyProcessOperatorCustomCoGroupFunction()) .addSink(new DiscardingSink<>()); |
Resource Utilization
When we set the Source1
,Process1
, Source2
and Process2
in 4 different SlotSharingGroups as above and then run this job, we can see that, before this PR, all of the tasks couldn't deploy and blocked in CREATED
state because only 2 slots provided in TaskManager. With our work, Source2
and Process2
can be deplyed after Source1
,Process1
finished and released their slots.
Efficiency Improvement
Without setting the slot sharing group, we run each benchmark 5 times, and average throughput(record / ms), throughput range and the magnification to the minimum throughput are provided. Here are the benchmark results:
Data Count | Key Count | Current Throughput | Optimized Throughput | Hybrid Shuffle Throughput |
---|---|---|---|---|
5e6 | 5e4 | 205 ± 4 (149%) | 1201 ± 59 (876%) | 1381 ± 14 (1008%) |
5e6 | 5e5 | 145 ± 21 (105%) | 1093 ± 61 (797%) | 1282 ± 41 (935%) |
1e7 | 1e5 | 186 ± 19 (135%) | 1270 ± 64 (927%) | 1423 ± 41 (1038%) |
1e7 | 1e6 | 137 ± 10 (100%) | 1176 ± 31 (858%) | 1313 ± 33 (958%) |
5e7 | 5e5 | OOM | 1463 ± 7 (1067%) | 1666 ± 35 (1216%) |
5e7 | 5e6 | OOM | 1099 ± 28 (802%) | 1222 ± 49 (891%) |
This shows that, with the changes proposed above, the total processing time can be about 8~10X faster than its current processing time in this scene. As the records number grows, job in current version causes OOM exception while in optimized version it works well. With hybrid shuffle applied in the batch execution part, the blocking edge from Source1
to Process1
changes to hybrid shuffle edge to improve the performance.
Benchmark: CoGroup
...
...); |
5 times in 4 kinds of configuration: streaming mode, batch mode, optimized streaming mode after this PR and optimized streaming mode with hybrid shuffle after this PR. Here are the throughput benchmark results:
Data Count | STREAMING | BATCH | Optimized STREAMING | With hybrid shuffle |
---|---|---|---|---|
2e6 | 61 ± 1 (100%) | 502 ± 75 (823%) | 868 ± 74 (1423%) | 899 ± 82 (1474%) |
5e6 | 60 ± 1 (100%) | 528 ± 28 (880%) | 1176 ± 39 (1960%) | 1262 ± 50 (2103%) |
2e7 | 56 ± 0 (100%) | 564 ± 34 (1007%) | 1492 ± 28 (2664%) | 1692 ± 14 (3021%) |
5e7 | 53 ± 1 (100%) | 602 ± 16 (1135%) | 1508 ± 18 (2845%) | 1712 ± 77 (3230%) |
This shows with the changes proposed above, DataStream#coGroup can be 14~30X faster than its current throughput in stream mode, and up to 2.5X faster than its current throughput in batch mode. Using hybrid shuffle in the outputEOF part can further improve the performance.
Benchmark: Aggregate
Use DataStream#aggregate to process records from a bounded streams which has Data Count
number of records, and every 100 records have the same key. Emit results after input
...
have ended. With this PR, when DataStream#coGroup is invoked with EndOfStreamWindows as the window assigner, EOFAggregationOperator
will be applied
...
, in which the input will be sorted first, then aggregate in each key and output results.
Code Block | ||
---|---|---|
| ||
data1.coGroup(data2) data .wherekeyBy(tuplevalue -> tuplevalue.f0) .equalTo(tuple -> tuple.f0) .window(EndOfStreamWindows.get()) .applyaggregate(new CustomCoGroupFunctionAggregator()) .addSink(...(new CountingAndDiscardingSink()); |
...
In this benchmark, we run each
...
job in
...
5 times in 4 kinds of configuration: streaming mode, batch mode, optimized streaming mode after this PR and optimized streaming mode with hybrid shuffle after this PR. Here are the throughput benchmark results:
Data Count |
---|
STREAMING | BATCH | Optimized STREAMING |
---|
With hybrid shuffle | ||||
---|---|---|---|---|
5e6 | 201 ± 3 (100%) | 1454 ± 153 (723%) | 1514 ± 43 (753%) | 1670 ± 29 (831%) |
1e7 | 207 ± 2 (100%) | 1522 ± 71 (735%) | 1620 ± 44 (782%) | 1866 ± 187 (901%) |
2e7 | 201 ± 3 (100%) | 1607 ± 9 (800%) | 1746 ± 31 (867%) | 2038 ± 155 (1014%) |
5e7 | 176 |
± 1 (100%) |
1559 ± |
6 ( |
885%) |
1723 ± |
22 ( |
978%) |
1956 ± |
56 ( |
1111%) |
This shows with the changes proposed above,
...
DataStream#aggregate can be
...
7~10X faster than its current throughput in stream mode, and
...
slightly faster to the current throughput in batch mode. Using hybrid shuffle in the
...
outputEOF part can further improve the performance.
Benchmark:
...
OperatorDelayedDeploy
We have a job that pre-processes records from a bounded source (Source1) using an operator (Process1) which only emits results after its input has ended. Then anther operator(Process2) needs to process records emitted by Process1 with records from an unbounded source, and emit results with low processing latency in real-time
...
.
Code Block | ||
---|---|---|
| ||
data source1.keyBy(value -> value.f0) .window.window(EndOfStreamWindows.get()) .aggregate(new AggregatorMyAggregator()).name("Process1") .connect(source2.keyBy(value -> value.f0)) .transform("Process2", Types.INT, new MyProcessOperator()) .addSink(new CountingAndDiscardingSinkDiscardingSink<>()); |
We run benchmark in 10 records per key and 100 records per key in , and each for 5 times, here are the throughput benchmark results:
10 records per key
Data Count | Key Count | Current | BATCH | Optimized | Hybrid Shuffle |
---|---|---|---|---|---|
5e6 | 5e5 | 164 ± 1 (145%) | 1277 ± 129 (1130%) | 1365 ± 79 (1207%) | 1418 ± 154 (1254%) |
1e7 | 1e6 | 151 ± 2 (133%) | 1403 ± 87 (1241%) | 1511 ± 43 (1337%) | 1697 ± 175 (1501%) |
2e7 | 2e6 | 129 ± 4 (114%) | 1322 ± 21 (1169%) | 1411 ± 17 (1248%) | 1598 ± 13 (1414%) |
5e7 | 5e6 | 113 ± 1 (100%) | 1282 ± 13 (1134%) | 1155 ± 13 (1022%) | 1256 ± 22 (1111%) |
100 records per key
Data Count | Key Count | Current | BATCH | Optimized | Hybrid Shuffle |
---|---|---|---|---|---|
5e6 | 5e4 | 201 ± 3 (114%) | 1454 ± 153 (826%) | 1514 ± 43 (860%) | 1670 ± 29 (948%) |
1e7 | 1e5 | 207 ± 2 (117%) | 1522 ± 71 (864%) | 1620 ± 44 (920%) | 1866 ± 187 (1060%) |
2e7 | 2e5 | 201 ± 3 (114%) | 1607 ± 9 (913%) | 1746 ± 31 (992%) | 2038 ± 155 (1157%) |
5e7 | 5e5 | 176 ± 1 (100%) | 1559 ± 6 (885%) | 1723 ± 22 (978%) | 1956 ± 56 (1111%) |
...
|
Resource Utilization
Suppose we set an extra standalone cluster configuration:
Code Block | ||
---|---|---|
| ||
taskmanager.numberOfTaskSlots: 2 |
When we set the Source1
,Process1
, Source2
and Process2
in 4 different SlotSharingGroups as above and then run this job, we could see that, before this PR, all of the tasks couldn't deploy and blocked in CREATED
state because only 2 slots provided in TaskManager. With our work, Source2
and Process2
can be deplyed after Source1
,Process1
finished and released their slots.
Efficiency Improvement
Without setting the slot sharing group, we run each benchmark 5 times. Here are the throughput benchmark results:
Data Count | STREAMING | Optimized STREAMING | With hybrid shuffle |
---|---|---|---|
5e6 | 205 ± 4 (100%) | 1201 ± 59 (586%) | 1381 ± 14 (674%) |
1e7 | 186 ± 19 (100%) | 1270 ± 64 (683%) | 1423 ± 41 (765%) |
5e7 | OOM | 1463 ± 7 | 1666 ± 35 |
Compatibility, Deprecation, and Migration Plan
...