Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In addition, this FLIP also changes the defualt trigger of GlobalWindows, which is NerverTrigger that nerver fires before, to EndOfStreamTriggeradds EndOfStreamTrigger in GlobalWindows. GlobalWindows with new default trigger EndOfStreamTrigger can be used with the DataStream API to specify whether the window will end only at the end of its inputs. DataStream program (e.g. coGroup and aggregate) can take advantage of this information to significantly increase throughput and reduce resource utilization.

...

3) Improve usability. For use-case that needs to invoke DataStream APIs (e.g. KeyedStream#window) with a window assigner that covers all input data, users can use the off-the-shelf GlobalWindows with new default trigger provided in this FLIP, instead of writing tens of lines of code to define this WindowAssigner subclass. 


Public Interfaces

1) Change a default trigger, GlobalWindows#NeverTrigger, to GlobalWindows#EndOfStreamTrigger. This window assigner with new default trigger Add the EndOfStreamTrigger in GlobalWindows which allows users of the DataStream API to create GlobalWindows with EndOfStreamTrigger to specify whether the computation (e.g. co-group, aggregate) should emit data only after end-of-input.

Code Block
languagejava
/**
 * A {@link WindowAssigner} that assigns all elements to the same {@link GlobalWindow}.
 *
 * <p>Use this if you want to use a {@link Trigger} and {@link
 * org.apache.flink.streaming.api.windowing.evictors.Evictor} to do flexible, policy based windows.
 */
@PublicEvolving
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
	···
	} and {@link
 * org.apache.flink.streaming.api.windowing.evictors.Evictor} to do flexible, policy based windows.
 */
@PublicEvolving
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
	···
 	private boolean isUsingEndOfStreamTrigger = false;
 	··· 
    @Override
    public Trigger<Object, GlobalWindow> getDefaultTrigger() {
        if (isUsingEndOfStreamTrigger) {
            return new EndOfStreamTrigger();
        } else {
            return new NeverTrigger();
        }
    }
 	··· 
    public static GlobalWindows createWithEndOfStreamTrigger() {
        GlobalWindows globalWindows = new GlobalWindows();
        globalWindows.isUsingEndOfStreamTrigger = true;
        return globalWindows;
    }
 	··· 
  /** A trigger that fires when input ends, as default Trigger for GlobalWindows. */
    @Internal
    public static class EndOfStreamTrigger extends Trigger<Object, GlobalWindow> {
        private static final long serialVersionUID = 1L;

        @Override
        public TriggerResult onElement(
                Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }

        @Override
        public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}

        @Override
        public void onMerge(GlobalWindow window, OnMergeContext ctx) {}
    }
	···
}

...

Code Block
languagejava
DataStream<Tuple2<Integer, Double>> source1  = env.fromCollection(
                new DataGenerator(dataNum), Types.TUPLE(Types.INT, Types.DOUBLE));
DataStream<Tuple2<Integer, Double>> source2  = env.fromCollection(
                new DataGenerator(dataNum), Types.TUPLE(Types.INT, Types.DOUBLE));

source1.coGroup(source2)
    .where(tuple -> tuple.f0)
    .equalTo(tuple -> tuple.f0)
    .window(GlobalWindows.createcreateWithEndOfStreamTrigger())
    .apply(new CustomCoGroupFunction())
    .addSink(...);

private static class CustomCoGroupFunction
            extends RichCoGroupFunction<Tuple2<Integer, Double>, Tuple2<Integer, Double>, Integer> {
        @Override
        public void coGroup(
                Iterable<Tuple2<Integer, Double>> iterableA,
                Iterable<Tuple2<Integer, Double>> iterableB,
                Collector<Integer> collector) {
            collector.collect(1);
        }
    }

...

Code Block
languagejava
DataStreamSource<Tuple2<Long, Double>> source  = env.fromCollection(
                        new DataGenerator(dataNum, keyNum), Types.TUPLE(Types.LONG, Types.DOUBLE));
source.keyBy(value -> value.f0)
      .window(GlobalWindows.createcreateWithEndOfStreamTrigger())
      .aggregate(new Aggregator())
      .addSink(...);

public static class Aggregator implements AggregateFunction<Tuple2<Long, Double>, Tuple2<Long, Double>, Double> {
        @Override
        public Tuple2<Long, Double> createAccumulator() {
            return new Tuple2<Long, Double>(0L, 0.0);
        }

        @Override
        public Tuple2<Long, Double> add(Tuple2<Long, Double> myData, Tuple2<Long, Double> accData) {
            accData.f1 = accData.f1 + myData.f1;
            return accData;
        }

        @Override
        public Double getResult(Tuple2<Long, Double> result) {
            return result.f1;
        }

        @Override
        public Tuple2<Long, Double> merge(Tuple2<Long, Double> acc1, Tuple2<Long, Double> acc2) {
            acc1.f1 = acc1.f1 + acc2.f1;
            return acc1;
        }
    }

...

Code Block
languagejava
 source1.keyBy(value -> value.f0)
  .window(GlobalWindows.createcreateWithEndOfStreamTrigger())
  .aggregate(new MyAggregator()).name("Process1")
  .connect(source2.keyBy(value -> value.f0))
  .transform("Process2", Types.INT, new MyProcessOperator())
  .addSink(...); 

...