...
Code Block | ||
---|---|---|
| ||
DataStream<Tuple2<Integer, Double>> data1 = env.coGroupfromCollection(data2) .where(tuple -> tuple.f0) .equalTo(tuple -> tuple.f0) .window(EndOfStreamWindows.get()) .apply(new CustomCoGroupFunction()) .addSink(...); |
The following result shows the throughput (records/sec) when the benchmark is executed in streaming mode, batch mode, optimized streaming mode after this PR, and optimized streaming mode with hybrid shuffle after this PR.
The result shows that DataStream#coGroup in optimized streaming mode can be 22X as fast as streaming mode and 3X as fast as batch mode
STREAMING | BATCH | Optimized STREAMING | With hybrid shuffle |
---|---|---|---|
66 ± 1 (100%, 1202426 ms) | 491 ± 5 (743%, 162731 ms) | 1506 ± 10 (2281%, 53098 ms) | 1677 ± 42 (2540%, 47687 ms) |
2) Execute DataStream#Aggregate
new DataGenerator(dataNum), Types.TUPLE(Types.INT, Types.DOUBLE));
DataStream<Tuple2<Integer, Double>> data2 = env.fromCollection(
new DataGenerator(dataNum), Types.TUPLE(Types.INT, Types.DOUBLE));
data1.coGroup(data2)
.where(tuple -> tuple.f0)
.equalTo(tuple -> tuple.f0)
.window(EndOfStreamWindows.get())
.apply(new CustomCoGroupFunction())
.addSink(...);
private static class CustomCoGroupFunction
extends RichCoGroupFunction<Tuple2<Integer, Double>, Tuple2<Integer, Double>, Integer> {
@Override
public void coGroup(
Iterable<Tuple3<Integer, Integer, Double>> iterableA,
Iterable<Tuple3<Integer, Integer, Double>> iterableB,
Collector<Integer> collector) {
collector.collect(1);
}
} |
The following result shows the throughput (records/sec) when the benchmark is executed in streaming mode, batch mode, optimized streaming mode after this PR, and optimized streaming mode with hybrid shuffle after this PR.
The result shows that DataStream#coGroup in optimized streaming mode can be 22X as fast as streaming mode and 3X as fast as batch mode
STREAMING | BATCH | Optimized STREAMING | With hybrid shuffle |
---|---|---|---|
66 ± 1 (100%, 1202426 ms) | 491 ± 5 (743%, 162731 ms) | 1506 ± 10 (2281%, 53098 ms) | 1677 ± 42 (2540%, 47687 ms) |
2) Execute DataStream#Aggregate
Code Block | ||
---|---|---|
| ||
DataStreamSource<Tuple2<Long, Double>> source = env.fromCollection(
new DataGenerator(dataNum, keyNum), Types.TUPLE(Types.LONG, Types.DOUBLE));
source
.keyBy(value -> value.f0)
.window(EndOfStreamWindows.get())
.aggregate(new Aggregator())
.addSink(...);
public static class Aggregator implements AggregateFunction<Tuple2<Long, Double>, Tuple2<Long, Double>, Double> {
@Override
public Tuple2<Long, Double> createAccumulator() {
return new Tuple2<Long, Double>(0L, 0.0);
}
@Override
public Tuple2<Long, Double> add(Tuple2<Long, Double> myData, Tuple2<Long, Double> accData) {
accData.f1 = accData.f1 + myData.f1;
return accData;
}
@Override
public Double getResult(Tuple2<Long, Double> result) {
return result.f1;
}
@Override
public Tuple2<Long, Double> merge(Tuple2<Long, Double> acc1, Tuple2<Long, Double> acc2) {
acc1.f1 = acc1.f1 + acc2.f1;
return acc1;
}
} | ||
Code Block | ||
| ||
data
.keyBy(value -> value.f0)
.window(EndOfStreamWindows.get())
.aggregate(new Aggregator())
.addSink(new CountingAndDiscardingSink()); |
The following result shows the throughput (records/sec) when the benchmark is executed in streaming mode, batch mode, optimized streaming mode after this PR, and optimized streaming mode with hybrid shuffle after this PR.
...
Code Block | ||
---|---|---|
| ||
source1.keyBy(value -> value.f0) .window(EndOfStreamWindows.get()) .aggregate(new MyAggregator()).name("Process1") .connect(source2.keyBy(value -> value.f0)) .transform("Process2", Types.INT, new MyProcessOperator()) .addSink(new DiscardingSink<>()...); |
We can use this program to demonstrate that the program requires less slot resources. More specifically, suppose we configure the standalone cluster with taskmanager.numberOfTaskSlots = 2, and set the Source1,Process1, Source2 and Process2 in 4 different SlotSharingGroups, the program will fail to be deployed before this FLIP. And the program can be deployed successfully after this FLIP. This is because Source2
and Process2
can be deplyed after Source1
,Process1
finished and released their slots.
...