THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
DataStream<Tuple2<Integer, Double>> data1 = env.fromCollection( new DataGenerator(dataNum), Types.TUPLE(Types.INT, Types.DOUBLE)); DataStream<Tuple2<Integer, Double>> data2 = env.fromCollection( new DataGenerator(dataNum), Types.TUPLE(Types.INT, Types.DOUBLE)); data1.coGroup(data2) .where(tuple -> tuple.f0) .equalTo(tuple -> tuple.f0) .window(EndOfStreamWindows.get()) .apply(new CustomCoGroupFunction()) .addSink(...); private static class CustomCoGroupFunction extends RichCoGroupFunction<Tuple2<Integer, Double>, Tuple2<Integer, Double>, Integer> { @Override public void coGroup( Iterable<Tuple3<Integer, IntegerIterable<Tuple2<Integer, Double>> iterableA, Iterable<Tuple3<Integer, IntegerIterable<Tuple2<Integer, Double>> iterableB, Collector<Integer> collector) { collector.collect(1); } } |
...