...
- If a Transformation has isOutputOnEOF == true:
- The process of operator chain can still be done. After that, the results of this its operator chain as well as and its upstream operators have blocking (by default) or h
ExecutionOptions.BATCH_SHUFFLE_MODE.
- This operator as well as its upstream operators will be executed in batch mode (e.g checkpoint is disabled when these operators are running).
- The process of operator chain can still be done. After that, the results of this its operator chain as well as and its upstream operators have blocking (by default) or h
- If all Transformation has isOutputOnCheckpoint == false:
- In FLIP-325, JM will not trigger an extra flush() before triggering a checkpoint.
...
EndOfStreamWindows
as the window assigner and trigger and evictor are not assigned, the optimized operator EOFCoGroupOperator
/ EOFAggregationOperator
will be applied, in which case the input(s) will be sorted first and then output the results after the input end of input. This makes the performance with our work can be mutiple times faster than before and slightly faster than batch mode because of avoiding window related work.Using hybrid shuffle in the outputEOF part can make part of operator pipeline consume records from upstream to further improve the performance.
...
STREAMING | BATCH | Optimized STREAMING | With hybrid shuffle | |
---|---|---|---|---|
2e78e7 | 82 66 ± 1 (100%, 241186 1202426 ms) | 422 491 ± 33 5 (514%743%, 47370 162731 ms) | 1406 1506 ± 6 10 (1714%2281%, 14223 53098 ms) | 1602 1677 ± 9 42 (1953%2540%, 12477 47687 ms) |
5e7 | 75 ± 1 (100%, 658373 ms) | 477 ± 12 (636%, 104666 ms) | 1452 ± 14 (1936%, 34427 ms) | 1626 ± 49 (2168%, 30736 ms) |
8e7 | 66 ± 1 (100%, 1202426 ms) | 491 ± 5 (743%, 162731 ms) | 1506 ± 10 (2281%, 53098 ms) | 1677 ± 42 (2540%, 47687 ms) |
This shows with the changes proposed above, DataStream#coGroup in Optimized STREAMING can be 17~20X faster than STREAMING throughput and 3X faster than BATCH
Benchmark: Aggregate
Data Count
Code Block | ||
---|---|---|
| ||
data
.keyBy(value -> value.f0)
.window(EndOfStreamWindows.get())
.aggregate(new Aggregator())
.addSink(new CountingAndDiscardingSink()); |
5 times in 4 kinds of configuration: streaming mode, batch mode, optimized streaming mode after this PR and optimized streaming mode with hybrid shuffle after this PR. Here are the throughput benchmark results:
Data Count | STREAMING | BATCH | Optimized STREAMING | With hybrid shuffle |
---|---|---|---|---|
2e7 | 188 ± 1 (100%, 106190 ms) | 1565 ± 15 (832%, 12775 ms) | 1671 ± 31 (888%, 11964 ms) | 1950 ± 8 (1037%, 10255 ms) |
5e7 | 171 ± 1 (100%, 290733 ms) | 1534 ± 13 (897%, 32588 ms) | 1712 ± 14 (1001%, 29201 ms) | 1988 ± 16 (1162%, 25149 ms) |
8e7 | 163 ± 0 (100%, 490478 ms) | 1561 ± 16 (957%, 51237 ms) | 1733 ± 9 (1063%, 46143 ms) | 1992 ± 15 (1222%, 40148 ms) |
This shows with the changes proposed above, DataStream#aggregate in Optimized STREAMING can be 8~10X faster than STREAMING throughput and slightly faster to the BATCH. Using hybrid shuffle in the outputEOF part can further improve the performance about 15%.
Benchmark: OperatorDelayedDeploy
We have a job that pre-processes records from a bounded source (Source1) using an operator (Process1) which only emits results after its input has ended. Then anther operator(Process2) needs to process records emitted by Process1 with records from an unbounded source, and emit results with low processing latency in real-time.
Code Block | ||
---|---|---|
| ||
source1.keyBy(value -> value.f0)
.window(EndOfStreamWindows.get())
.aggregate(new MyAggregator()).name("Process1")
.connect(source2.keyBy(value -> value.f0))
.transform("Process2", Types.INT, new MyProcessOperator())
.addSink(new DiscardingSink<>()); |
Resource Utilization
Suppose we set an extra standalone cluster configuration:
Code Block | ||
---|---|---|
| ||
taskmanager.numberOfTaskSlots: 2 |
Source1
,Process1
, Source2
and Process2
in 4 different SlotSharingGroups as above and then run this job, we could see that, before this PR, all of the tasks couldn't deploy and blocked in CREATED
state because only 2 slots provided in TaskManager. With our work, Source2
and Process2
can be deplyed after Source1
,Process1
finished and released their slots.
Efficiency Improvement
Without setting the slot sharing group, we run each benchmark 5 times. Process1
will aggregate Data Count
number of records from Source1
, and every 100 records have the same key. Benchmark terminates after Process2
process Data Count
number of records from Source2
.
Here are the throughput benchmark results:
Data Count | STREAMING | Optimized STREAMING | With hybrid shuffle |
---|---|---|---|
1e7 | 183 ± 19 (100%, 54370 ms) | 1371 ± 43 (749%, 7289 ms) | 1568 ± 49 (856%, 6376 ms) |
3e7 | 40 ± 10 (747765 ms) | 1461 ± 19 (20531 ms) | 1674 ± 41 (17912 ms) |
5e7 | OOM | 1522 ± 11 (32843 ms) | 1750 ± 14 (28561 ms) |
This shows that, aggregation optimization above can also be detected in this benchmark. In line 1e7, the magnificaiton is smaller than Benchmark: Aggregate because the stage of Process2
processing records from Source2
in STEAMING is more faster than Optimized STREAMING as records from Source2
already buffered.
...
This shows with the changes proposed above, DataStream#coGroup in Optimized STREAMING can be 20X faster than STREAMING throughput and 3X faster than BATCH
Benchmark: Aggregate
Data Count
Code Block | ||
---|---|---|
| ||
data
.keyBy(value -> value.f0)
.window(EndOfStreamWindows.get())
.aggregate(new Aggregator())
.addSink(new CountingAndDiscardingSink()); |
5 times in 4 kinds of configuration: streaming mode, batch mode, optimized streaming mode after this PR and optimized streaming mode with hybrid shuffle after this PR. Here are the throughput benchmark results:
STREAMING | BATCH | Optimized STREAMING | With hybrid shuffle | |
---|---|---|---|---|
8e7 | 163 ± 0 (100%, 490478 ms) | 1561 ± 16 (957%, 51237 ms) | 1733 ± 9 (1063%, 46143 ms) | 1992 ± 15 (1222%, 40148 ms) |
This shows with the changes proposed above, DataStream#aggregate in Optimized STREAMING can be 10X faster than STREAMING throughput and slightly faster to the BATCH
Benchmark: OperatorDelayedDeploy
We have a job that pre-processes records from a bounded source (Source1) using an operator (Process1) which only emits results after its input has ended. Then anther operator(Process2) needs to process records emitted by Process1 with records from an unbounded source, and emit results with low processing latency in real-time.
Code Block | ||
---|---|---|
| ||
source1.keyBy(value -> value.f0)
.window(EndOfStreamWindows.get())
.aggregate(new MyAggregator()).name("Process1")
.connect(source2.keyBy(value -> value.f0))
.transform("Process2", Types.INT, new MyProcessOperator())
.addSink(new DiscardingSink<>()); |
Resource Utilization
Suppose we set an extra standalone cluster configuration:
Code Block | ||
---|---|---|
| ||
taskmanager.numberOfTaskSlots: 2 |
Source1
,Process1
, Source2
and Process2
in 4 different SlotSharingGroups as above and then run this job, we could see that, before this PR, all of the tasks couldn't deploy and blocked in CREATED
state because only 2 slots provided in TaskManager. With our work, Source2
and Process2
can be deplyed after Source1
,Process1
finished and released their slots.
Without setting the slot sharing group, we run job in STREAMING, Optimized STREAMING and Optimized STREAMING With hybrid shuffle.As the job records number grows, throughput in STREAMING gets decreased because of the buffer presure in Process2
, and even causes OOM exception while it works well in Optimized STREAMING
...
. Using hybrid shuffle can make Process1
to pipeline consume records from Source1
Compatibility, Deprecation, and Migration Plan
...