...
In addition, after FLIP-327 is accepted, this operator suppose this API is called without any explicitly-assigned trigger or evictor, the program should also have isInternalSorterSupported = true to achieve higher throughput by using the optimizations currently done in batch mode.
The following optimization will be used to achieve This operator will use the following optimization to achieve much higher throughput than the existing DataStream#coGroup in both stream and batch mode:
...
In addition, after FLIP-327 is accepted, this operator suppose this API is called without any explicitly-assigned trigger or evictor, the program should also have isInternalSorterSupported = true to achieve higher throughput by using the optimizations currently done in batch mode.
More specifically, . This operator will sort the input before aggregateaggregation, and avoid invoking window actions, which is similar to '5)'.
Benchmark results
EndOfStreamWindows
as the window assigner and trigger and evictor are not assigned, the optimized operator EOFCoGroupOperator
/ EOFAggregationOperator
will be applied, in which case the input(s) will be sorted first and then output the results after the input end of input. This makes the performance with our work can be mutiple times faster than before and slightly faster than batch mode because of avoiding window related work.Using hybrid shuffle in the outputEOF part can make part of operator pipeline consume records from upstream to further improve the performance.
To demonstrate our optimization improvements,we run each benchmark in different execution modes and configurations 5 times. Here are benchmark results which include
...
To demonstrate our optimization improvements,we run each benchmark in different execution modes and configurations. Each result is measured by taking the average execution time across 5 runs with the given configuration.
...
Code Block | ||
---|---|---|
| ||
jobmanager.memory.process.size: 6400m taskmanager.memory.process.size: 6912m |
records with unique key for each record.
Data Count
number of records distributed in Data Count
Code Block | ||
---|---|---|
| ||
data1.coGroup(data2)
.where(tuple -> tuple.f0)
.equalTo(tuple -> tuple.f0)
.window(EndOfStreamWindows.get())
.apply(new CustomCoGroupFunction())
.addSink(...); |
...
..); |
The following result shows the throughput (records/sec) when the benchmark is executed in streaming mode, batch mode, optimized streaming mode after this PR, and optimized streaming mode with hybrid shuffle after this PR
...
.
The result shows that DataStream#coGroup in optimized streaming mode can be 22X as fast as streaming mode and 3X as fast as batch mode
...
STREAMING | BATCH | Optimized STREAMING | With hybrid shuffle | |
---|---|---|---|---|
8e7 | 66 ± 1 (100%, 1202426 ms) | 491 ± 5 (743%, 162731 ms) | 1506 ± 10 (2281%, 53098 ms) | 1677 ± 42 (2540%, 47687 ms) |
This shows with the changes proposed above, DataStream#coGroup in Optimized STREAMING can be 20X faster than STREAMING throughput and 3X faster than BATCH
Benchmark: Aggregate
, 53098 ms) | 1677 ± 42 (2540%, 47687 ms) |
2) Execute DataStream#Aggregate
Data Count
Code Block | ||
---|---|---|
| ||
data .keyBy(value -> value.f0) .window(EndOfStreamWindows.get()) .aggregate(new Aggregator()) .addSink(new CountingAndDiscardingSink()); |
...
The following result shows the throughput (records/sec) when the benchmark is executed in streaming mode, batch mode, optimized streaming mode after this PR, and optimized streaming mode with hybrid shuffle after this PR
...
.
...
STREAMING | BATCH | Optimized STREAMING | With hybrid shuffle | |
---|---|---|---|---|
163 ± 0 (100%, 490478 ms) | 1561 ± 16 (957%, 51237 ms) | 1733 ± 9 (1063%, 46143 ms) | 1992 ± 15 (1222%, 40148 ms) |
This shows with the changes proposed above, DataStream#aggregate in Optimized STREAMING can be 10X faster than STREAMING throughput and slightly faster to the BATCH
Benchmark: OperatorDelayedDeploy
...
3) Execute a program that needs to fully process data from a bounded source before processing data from another unbounded source.
The following program demonstrates the scenario described in the motivation section. The program needs to pre-processes records from a bounded source (Source1) using an operator (Process1) which only emits results after its input has ended. Then anther operator(Process2) needs to process records emitted by Process1 with records from an unbounded source, and emit results with low processing latency in real-time.
Code Block | ||
---|---|---|
| ||
source1.keyBy(value -> value.f0) .window(EndOfStreamWindows.get()) .aggregate(new MyAggregator()).name("Process1") .connect(source2.keyBy(value -> value.f0)) .transform("Process2", Types.INT, new MyProcessOperator()) .addSink(new DiscardingSink<>()); |
Resource Utilization
Suppose we set an extra standalone cluster configuration:
Code Block | ||
---|---|---|
| ||
taskmanager.numberOfTaskSlots: 2 |
...
We can use this program to demonstrate that the program requires less slot resources. More specifically, suppose we configure the standalone cluster with taskmanager.numberOfTaskSlots = 2, and set the Source1,Process1, Source2 and Process2 in 4 different SlotSharingGroups
...
, the program will fail to be deployed before this FLIP. And the program can be deployed successfully after this FLIP. This is because Source2
and Process2
can be deplyed after Source1
,Process1
finished and released their slots.
Additionally, we can use this program to demonstrate that it can achieve higher performance because Process2
will not need to keep buffer records emitted by Source2
in is memory while Process1
has not reached EOF. More specifically, the program can fail with OOM before this FLIP when the number of records in inputs is high. And the program can finish successfully without OOM after this FLIP
...
.
Compatibility, Deprecation, and Migration Plan
...