...
Code Block |
---|
|
package org.apache.flink.streaming.api.operators;
/** The builder class for {@link OperatorAttributes}. */
@PublicEvolving@Experimental
public class OperatorAttributesBuilder {
@Nullable private Boolean outputOnEOF = null;
@Nullable private Boolean outputOnCheckpoint = null;
public OperatorAttributesBuilder() {...}
public OperatorAttributesBuilder setOutputOnEOF(boolean outputOnEOF) {...}
public OperatorAttributesBuilder setOutputOnCheckpoint(boolean outputOnCheckpoint) {...}
/**
* If any operator attribute is null, we will log it at DEBUG level and use the following
* default values.
* - outputOnEOF defaults to false
* - outputOnCheckpoint defaults to false
*/
public OperatorAttributes build() {...}
} |
...
Code Block |
---|
|
package org.apache.flink.streaming.api.operators;
/**
* OperatorAttributes element provides Job Manager with information that can be
* used to optimize the job performance.
*/
@PublicEvolving@Experimental
public class OperatorAttributes {
/**
* Returns true iff the operator can only emit records after inputs have reached EOF.
*
* <p>Here are the implications when it is true:
*
* <ul>
* <li> The results of this operator as well as its upstream operators have blocking partition type.
* <li> This operator as well as its upstream operators will be executed in batch mode.
* </ul>
*/
public boolean isOutputOnEOF() {...}
/**
* Returns true iff the operator can only emit records when checkpoint is triggered.
*/
public boolean isOutputOnCheckpoint() {...}
} |
...
3) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.
Code Block |
---|
|
@PublicEvolving@Experimental
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
...
default OperatorAttributes getOperatorAttributes() {
return new OperatorAttributesBuilder().build();
}
}
@PublicEvolving@Experimental
public interface StreamOperatorFactory<OUT> extends Serializable {
...
default OperatorAttributes getOperatorAttributes() {
return new OperatorAttributesBuilder().build();
}
} |
...
In addition, after FLIP-327 is accepted, this operator should also have isInternalSorterSupported = true. This operator will sort the input before aggregate, and avoid invoking window actions, which is similar to '5)'.
Benchmark results
To demonstrate our optimization improvements,we run each benchmark in different execution modes and configurations 5 times. Here are benchmark results which include average throughput (records/ms), throughput range, and the magnification factor for each streaming job before our work.
Environment Configuration
We run benchmarks on a MacBook with the latest Flink 1.18-snapshot and parallelism=1. Default standalone cluster configuration is used except:
Code Block |
---|
|
jobmanager.memory.process.size: 6400m
taskmanager.memory.process.size: 6912m |
Benchmark: CoGroup
Use DataStream#coGroup to process records from two bounded streams with Data Count
number of records and emit results after both inputs have ended. With this PR, when DataStream#coGroup is invoked with EndOfStreamWindows as the window assigner, EOFCoGroupOperator
will be applied, in which two inputs will be sorted first and then output the coGroup results.
Code Block |
---|
|
data1.coGroup(data2)
.where(tuple -> tuple.f0)
.equalTo(tuple -> tuple.f0)
.window(EndOfStreamWindows.get())
.apply(new CustomCoGroupFunction())
.addSink(...); |
In this benchmark, we run each job in CoGroupDataStream 5 times in 4 kinds of configuration: streaming mode, batch mode, optimized streaming mode after this PR and optimized streaming mode with hybrid shuffle after this PR. Here are the throughput benchmark results:
Data Count | STREAMING | BATCH | Optimized STREAMING | With hybrid shuffle |
---|
2e6 | 61 ± 1 (100%) | 502 ± 75 (823%) | 868 ± 74 (1423%) | 899 ± 82 (1474%) |
5e6 | 60 ± 1 (100%) | 528 ± 28 (880%) | 1176 ± 39 (1960%) | 1262 ± 50 (2103%) |
2e7 | 56 ± 0 (100%) | 564 ± 34 (1007%) | 1492 ± 28 (2664%) | 1692 ± 14 (3021%) |
5e7 | 53 ± 1 (100%) | 602 ± 16 (1135%) | 1508 ± 18 (2845%) | 1712 ± 77 (3230%) |
This shows with the changes proposed above, DataStream#coGroup can be 14~30X faster than its current throughput in stream mode, and up to 2.5X faster than its current throughput in batch mode. Using hybrid shuffle in the outputEOF part can further improve the performance.
Benchmark: Aggregate
Use DataStream#aggregate to process records from a bounded streams which has Data Count
number of records, and every 100 records have the same key. Emit results after input have ended. With this PR, when DataStream#coGroup is invoked with EndOfStreamWindows as the window assigner, EOFAggregationOperator
will be applied, in which the input will be sorted first, then aggregate in each key and output results.
Code Block |
---|
|
data
.keyBy(value -> value.f0)
.window(EndOfStreamWindows.get())
.aggregate(new Aggregator())
.addSink(new CountingAndDiscardingSink()); |
In this benchmark, we run each job in AggregationBenchmark 5 times in 4 kinds of configuration: streaming mode, batch mode, optimized streaming mode after this PR and optimized streaming mode with hybrid shuffle after this PR. Here are the throughput benchmark results:
Data Count | STREAMING | BATCH | Optimized STREAMING | With hybrid shuffle |
---|
5e6 | 201 ± 3 (100%) | 1454 ± 153 (723%) | 1514 ± 43 (753%) | 1670 ± 29 (831%) |
1e7 | 207 ± 2 (100%) | 1522 ± 71 (735%) | 1620 ± 44 (782%) | 1866 ± 187 (901%) |
2e7 | 201 ± 3 (100%) | 1607 ± 9 (800%) | 1746 ± 31 (867%) | 2038 ± 155 (1014%) |
5e7 | 176 ± 1 (100%) | 1559 ± 6 (885%) | 1723 ± 22 (978%) | 1956 ± 56 (1111%) |
This shows with the changes proposed above, DataStream#aggregate can be 7~10X faster than its current throughput in stream mode, and slightly faster to the current throughput in batch mode. Using hybrid shuffle in the outputEOF part can further improve the performance.
Benchmark: OperatorDelayedDeploy
We have a job that pre-processes records from a bounded source (Source1) using an operator (Process1) which only emits results after its input has ended. Then anther operator(Process2) needs to process records emitted by Process1 with records from an unbounded source, and emit results with low processing latency in real-time.
Code Block |
---|
|
source1.keyBy(value -> value.f0)
.window(EndOfStreamWindows.get())
.aggregate(new MyAggregator()).name("Process1")
.connect(source2.keyBy(value -> value.f0))
.transform("Process2", Types.INT, new MyProcessOperator())
.addSink(new DiscardingSink<>()); |
Resource Utilization
Suppose we set an extra standalone cluster configuration:
Code Block |
---|
|
taskmanager.numberOfTaskSlots: 2 |
When we set the Source1
,Process1
, Source2
and Process2
in 4 different SlotSharingGroups as above and then run this job, we could see that, before this PR, all of the tasks couldn't deploy and blocked in CREATED
state because only 2 slots provided in TaskManager. With our work, Source2
and Process2
can be deplyed after Source1
,Process1
finished and released their slots.
OperatorDelayedDeployDataStream
Efficiency Improvement
Without setting the slot sharing group, we run OperatorDelayedDeployDataStreamInOneSlot each benchmark 5 times. Here are the throughput benchmark results:
Data Count | STREAMING | Optimized STREAMING | With hybrid shuffle |
---|
5e6 | 205 ± 4 (100%) | 1201 ± 59 (586%) | 1381 ± 14 (674%) |
1e7 | 186 ± 19 (100%) | 1270 ± 64 (683%) | 1423 ± 41 (765%) |
5e7 | OOM | 1463 ± 7 | 1666 ± 35 |
This shows that, as the records number grows, job in streaming mode causes OOM exception while it works well with the changes proposed above.
Compatibility, Deprecation, and Migration Plan
...