Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Below is the DataStream program code snippet.

Code Block
languagejava
DataStream<Tuple2<Integer, Double>> data1  = env.coGroupfromCollection(data2)
    .where(tuple -> tuple.f0)
    .equalTo(tuple -> tuple.f0)
    .window(EndOfStreamWindows.get())
    .apply(new CustomCoGroupFunction())
    .addSink(...);

The following result shows the throughput (records/sec) when the benchmark is executed in streaming mode, batch mode, optimized streaming mode after this PR, and optimized streaming mode with hybrid shuffle after this PR.

The result shows that DataStream#coGroup in optimized streaming mode can be 22X as fast as streaming mode and 3X as fast as batch mode. Hybrid shuffle can further improve throughput by 11%.

STREAMINGBATCHOptimized STREAMINGWith hybrid shuffle
66 ± 1 (100%, 1202426 ms)491 ± 5 (743%, 162731 ms)1506 ± 10 (2281%, 53098 ms)1677 ± 42 (2540%, 47687 ms)

2) Execute DataStream#Aggregate

This benchmark uses DataStream#aggregate to process 8*10^7 records. These records are evenly distributed across 8*10^5 keys, i.e. each unique key has 100 records.

Below is the DataStream program code snippet.


                new DataGenerator(dataNum), Types.TUPLE(Types.INT, Types.DOUBLE));
DataStream<Tuple2<Integer, Double>> data2  = env.fromCollection(
                new DataGenerator(dataNum), Types.TUPLE(Types.INT, Types.DOUBLE));

data1.coGroup(data2)
    .where(tuple -> tuple.f0)
    .equalTo(tuple -> tuple.f0)
    .window(EndOfStreamWindows.get())
    .apply(new CustomCoGroupFunction())
    .addSink(...);

private static class CustomCoGroupFunction
            extends RichCoGroupFunction<Tuple2<Integer, Double>, Tuple2<Integer, Double>, Integer> {
        @Override
        public void coGroup(
                Iterable<Tuple3<Integer, Integer, Double>> iterableA,
                Iterable<Tuple3<Integer, Integer, Double>> iterableB,
                Collector<Integer> collector) {
            collector.collect(1);
        }
    }


The following result shows the throughput (records/sec) when the benchmark is executed in streaming mode, batch mode, optimized streaming mode after this PR, and optimized streaming mode with hybrid shuffle after this PR.

The result shows that DataStream#coGroup in optimized streaming mode can be 22X as fast as streaming mode and 3X as fast as batch mode. Hybrid shuffle can further improve throughput by 11%.

STREAMINGBATCHOptimized STREAMINGWith hybrid shuffle
66 ± 1 (100%, 1202426 ms)491 ± 5 (743%, 162731 ms)1506 ± 10 (2281%, 53098 ms)1677 ± 42 (2540%, 47687 ms)


2) Execute DataStream#Aggregate

This benchmark uses DataStream#aggregate to process 8*10^7 records. These records are evenly distributed across 8*10^5 keys, i.e. each unique key has 100 records which are generated separately.

Below is the DataStream program code snippet.

Code Block
languagejava
DataStreamSource<Tuple2<Long, Double>> source  = env.fromCollection(
                        new DataGenerator(dataNum, keyNum), Types.TUPLE(Types.LONG, Types.DOUBLE));
source
      .keyBy(value -> value.f0)
      .window(EndOfStreamWindows.get())
      .aggregate(new Aggregator())
      .addSink(...);

public static class Aggregator implements AggregateFunction<Tuple2<Long, Double>, Tuple2<Long, Double>, Double> {
        @Override
        public Tuple2<Long, Double> createAccumulator() {
            return new Tuple2<Long, Double>(0L, 0.0);
        }

        @Override
        public Tuple2<Long, Double> add(Tuple2<Long, Double> myData, Tuple2<Long, Double> accData) {
            accData.f1 = accData.f1 + myData.f1;
            return accData;
        }

        @Override
        public Double getResult(Tuple2<Long, Double> result) {
            return result.f1;
        }

        @Override
        public Tuple2<Long, Double> merge(Tuple2<Long, Double> acc1, Tuple2<Long, Double> acc2) {
            acc1.f1 = acc1.f1 + acc2.f1;
            return acc1;
        }
    }
Code Block
languagejava
data
      .keyBy(value -> value.f0)
      .window(EndOfStreamWindows.get())
      .aggregate(new Aggregator())
      .addSink(new CountingAndDiscardingSink());


The following result shows the throughput (records/sec) when the benchmark is executed in streaming mode, batch mode, optimized streaming mode after this PR, and optimized streaming mode with hybrid shuffle after this PR.

...

Code Block
languagejava
 source1.keyBy(value -> value.f0)
  .window(EndOfStreamWindows.get())
  .aggregate(new MyAggregator()).name("Process1")
  .connect(source2.keyBy(value -> value.f0))
  .transform("Process2", Types.INT, new MyProcessOperator())
  .addSink(new DiscardingSink<>()...); 


We can use this program to demonstrate that the program requires less slot resources. More specifically, suppose we configure the standalone cluster with taskmanager.numberOfTaskSlots = 2, and set the Source1,Process1, Source2 and Process2 in 4 different SlotSharingGroups, the program will fail to be deployed before this FLIP. And the program can be deployed successfully after this FLIP. This is because Source2 and Process2 can be deplyed after Source1Process1 finished and released their slots.

...