Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This benchmark uses DataStream#coGroup to process records from two bounded inputs. Each input has 8*10^7will generate records with unique key for each record(key = i, value = i) for i from 1 to 8*10^7.

Below is the DataStream program code snippet.

Code Block
languagejava
DataStream<Tuple2<Integer, Double>> data1source1  = env.fromCollection(
                new DataGenerator(dataNum), Types.TUPLE(Types.INT, Types.DOUBLE));
DataStream<Tuple2<Integer, Double>> data2source2  = env.fromCollection(
                new DataGenerator(dataNum), Types.TUPLE(Types.INT, Types.DOUBLE));

data1source1.coGroup(data2source2)
    .where(tuple -> tuple.f0)
    .equalTo(tuple -> tuple.f0)
    .window(EndOfStreamWindows.get())
    .apply(new CustomCoGroupFunction())
    .addSink(...);

private static class CustomCoGroupFunction
            extends RichCoGroupFunction<Tuple2<Integer, Double>, Tuple2<Integer, Double>, Integer> {
        @Override
        public void coGroup(
                Iterable<Tuple2<Integer, Double>> iterableA,
                Iterable<Tuple2<Integer, Double>> iterableB,
                Collector<Integer> collector) {
            collector.collect(1);
        }
    }

...

This benchmark uses DataStream#aggregate to process 8*10^7 records. These records are evenly distributed across 8*10^5 keys. More specifically, the source will generate records with (key = i, i.e. each unique key has 100 records which are generated separatelyvalue = i) for i from 1 to 8*10^5, and repeat this process 100 times.

Below is the DataStream program code snippet.

Code Block
languagejava
DataStreamSource<Tuple2<Long, Double>> source  = env.fromCollection(
                        new DataGenerator(dataNum, keyNum), Types.TUPLE(Types.LONG, Types.DOUBLE));
source
      .keyBy(value -> value.f0)
      .window(EndOfStreamWindows.get())
      .aggregate(new Aggregator())
      .addSink(...);

public static class Aggregator implements AggregateFunction<Tuple2<Long, Double>, Tuple2<Long, Double>, Double> {
        @Override
        public Tuple2<Long, Double> createAccumulator() {
            return new Tuple2<Long, Double>(0L, 0.0);
        }

        @Override
        public Tuple2<Long, Double> add(Tuple2<Long, Double> myData, Tuple2<Long, Double> accData) {
            accData.f1 = accData.f1 + myData.f1;
            return accData;
        }

        @Override
        public Double getResult(Tuple2<Long, Double> result) {
            return result.f1;
        }

        @Override
        public Tuple2<Long, Double> merge(Tuple2<Long, Double> acc1, Tuple2<Long, Double> acc2) {
            acc1.f1 = acc1.f1 + acc2.f1;
            return acc1;
        }
    }

...