Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
DataStream<Tuple2<Integer, Double>> data1  = env.fromCollection(
                new DataGenerator(dataNum), Types.TUPLE(Types.INT, Types.DOUBLE));
DataStream<Tuple2<Integer, Double>> data2  = env.fromCollection(
                new DataGenerator(dataNum), Types.TUPLE(Types.INT, Types.DOUBLE));

data1.coGroup(data2)
    .where(tuple -> tuple.f0)
    .equalTo(tuple -> tuple.f0)
    .window(EndOfStreamWindows.get())
    .apply(new CustomCoGroupFunction())
    .addSink(...);

private static class CustomCoGroupFunction
            extends RichCoGroupFunction<Tuple2<Integer, Double>, Tuple2<Integer, Double>, Integer> {
        @Override
        public void coGroup(
                Iterable<Tuple3<Integer, IntegerIterable<Tuple2<Integer, Double>> iterableA,
                Iterable<Tuple3<Integer, IntegerIterable<Tuple2<Integer, Double>> iterableB,
                Collector<Integer> collector) {
            collector.collect(1);
        }
    }

...