Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

[This FLIP proposal is a joint work between Ran Jinhao and Dong Lin ]


Table of Contents

Motivation

As shown in the figure below, we might have a job that pre-processes records from a bounded source (i.e. inputA) using an operator (i.e. operatorA) which only emits results after its input has ended. Then operatorB needs to join records emitted by operatorA with records from an unbounded source, and emit results with low processing latency in real-time.

...

In addition, this FLIP also adds EndOfStreamWindows that changes the defualt trigger of GlobalWindows, which is NerverTrigger that nerver fires before, to EndOfStreamTrigger. GlobalWindows with new default trigger can be used with the DataStream API to specify whether the window will end only at the end of its inputs. DataStream program (e.g. coGroup and aggregate) can take advantage of this information to significantly increase throughput and reduce resource utilization.


Objectives

The APIs provided in this FLIP achieves the following objectives / benefits:

...

3) Improve usability. For use-case that needs to invoke DataStream APIs (e.g. KeyedStream#window) with a window assigner that covers all input data, users can use the off-the-shelf EndOfStreamWindows GlobalWindows with new default trigger provided in this FLIP, instead of writing tens of lines of code to define this WindowAssigner subclass. 


Public Interfaces

1) Add EndOfStreamWindows which is a subclass of WindowAssigner. This class Change a default trigger, GlobalWindows#NeverTrigger, to GlobalWindows#EndOfStreamTrigger. This window assigner with new default trigger allows users of the DataStream API to specify whether the computation (e.g. co-group, aggregate) should emit data only after end-of-input.

Code Block
languagejava
/**
 * A This{@link WindowAssigner} that assigns all elements to the same window that is fired iff the input
 * stream reaches EOF.
 */
@PublicEvolving
public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> {
 
    private static final TimeWindow TIME_WINDOW_INSTANCE =
            new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE);
 
    private EndOfStreamWindows() {}
 
    public static EndOfStreamWindows get() {
        return INSTANCE;
    }
 
    @Override
    public Collection<TimeWindow> assignWindows(
            Object element, long timestamp, WindowAssignerContext context) {
        return Collections.singletonList(TIME_WINDOW_INSTANCE);
    }
 
    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return new EndOfStreamTrigger();
    }
 
    @Override
    public boolean isEventTime() {
        return true;
    }
 
{@link GlobalWindow}.
 *
 * <p>Use this if you want to use a {@link Trigger} and {@link
 * org.apache.flink.streaming.api.windowing.evictors.Evictor} to do flexible, policy based windows.
 */
@PublicEvolving
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
	···
	/** A trigger that fires when input ends, as default Trigger for GlobalWindows. */
    @Internal
    public static class EndOfStreamTrigger extends Trigger<Object, GlobalWindow> {
        private static classfinal EndOfStreamTriggerlong extendsserialVersionUID Trigger<Object, TimeWindow> {= 1L;

        @Override
        public TriggerResult onElement(
                Object element, long timestamp, TimeWindowGlobalWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
    throws Exception {
      return time == window.maxTimestamp() ? TriggerResult.FIRE return: TriggerResult.CONTINUE;
        }
 
        @Override
        public TriggerResult onEventTimeonProcessingTime(long time, TimeWindowGlobalWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
        }
 
        @Override
        public TriggerResultvoid onProcessingTime(long time, TimeWindowclear(GlobalWindow window, TriggerContext ctx) throws Exception {}

            return TriggerResult.CONTINUE;@Override
        }
public void onMerge(GlobalWindow window, OnMergeContext    ...ctx) {}
    }
	···
}


2) Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that Flink runtime can use to optimize the job performance.

...

Code Block
languagejava
@Experimental
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...
 
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}
 
@Experimental
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...
 
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}


Proposed Changes

1) Add the APIs on Transformation interface to get the corresponding operator attributes.

...

6) When DataStream#aggregate is invoked with EndOfStreamWindows GlobalWindows and its default trigger as the window assigner and trigger,  Flink Flink should generate an operator with isOutputOnEOF = true.

...

More specifically, This operator will sort the input before aggregation, and avoid invoking window actions, which is similar to '5)'.

Analysis of APIs affected by this FLIP

This FLIP can potentially benefit all DataStream APIs that take WindowAssigner as the parameter.

More specifically, the following DataStream API can benefit from using EndOfStreamWindowsGlobalWindows with default trigger.

  • DataStream#windowAll
  • KeyedStream#window
  • CoGroupedStreams#Where#EqualTo#window
  • JoinedStreams#Where#EqualTo#window

Benchmark results

To demonstrate our optimization improvements,we run each benchmark in different execution modes and configurations. Each result is measured by taking the average execution time across 5 runs with the given configuration.

...

Code Block
languagejava
DataStream<Tuple2<Integer, Double>> source1  = env.fromCollection(
                new DataGenerator(dataNum), Types.TUPLE(Types.INT, Types.DOUBLE));
DataStream<Tuple2<Integer, Double>> source2  = env.fromCollection(
                new DataGenerator(dataNum), Types.TUPLE(Types.INT, Types.DOUBLE));

source1.coGroup(source2)
    .where(tuple -> tuple.f0)
    .equalTo(tuple -> tuple.f0)
    .window(EndOfStreamWindowsGlobalWindows.getcreate())
    .apply(new CustomCoGroupFunction())
    .addSink(...);

private static class CustomCoGroupFunction
            extends RichCoGroupFunction<Tuple2<Integer, Double>, Tuple2<Integer, Double>, Integer> {
        @Override
        public void coGroup(
                Iterable<Tuple2<Integer, Double>> iterableA,
                Iterable<Tuple2<Integer, Double>> iterableB,
                Collector<Integer> collector) {
            collector.collect(1);
        }
    }

...

Code Block
languagejava
DataStreamSource<Tuple2<Long, Double>> source  = env.fromCollection(
                        new DataGenerator(dataNum, keyNum), Types.TUPLE(Types.LONG, Types.DOUBLE));
source.keyBy(value -> value.f0)
      .window(EndOfStreamWindowsGlobalWindows.getcreate())
      .aggregate(new Aggregator())
      .addSink(...);

public static class Aggregator implements AggregateFunction<Tuple2<Long, Double>, Tuple2<Long, Double>, Double> {
        @Override
        public Tuple2<Long, Double> createAccumulator() {
            return new Tuple2<Long, Double>(0L, 0.0);
        }

        @Override
        public Tuple2<Long, Double> add(Tuple2<Long, Double> myData, Tuple2<Long, Double> accData) {
            accData.f1 = accData.f1 + myData.f1;
            return accData;
        }

        @Override
        public Double getResult(Tuple2<Long, Double> result) {
            return result.f1;
        }

        @Override
        public Tuple2<Long, Double> merge(Tuple2<Long, Double> acc1, Tuple2<Long, Double> acc2) {
            acc1.f1 = acc1.f1 + acc2.f1;
            return acc1;
        }
    }

...

Code Block
languagejava
 source1.keyBy(value -> value.f0)
  .window(EndOfStreamWindowsGlobalWindows.getcreate())
  .aggregate(new MyAggregator()).name("Process1")
  .connect(source2.keyBy(value -> value.f0))
  .transform("Process2", Types.INT, new MyProcessOperator())
  .addSink(...); 

...

Additionally, we can use this program to demonstrate that it can achieve higher performance because Process2 will not need to keep buffer records emitted by Source2 in its memory while Process1 has not reached EOF. More specifically, the program can fail with OOM before this FLIP when the number of records in inputs is high. And the program can finish successfully without OOM after this FLIP.

Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.

Future Work

It would be useful to add an ExecutionState (e.g. Finishing) to specify whether the task has reached EOF for all its inputs. This allows JM to deploy its downstream tasks and possibly apply hybrid shuffle to increase job throughput.

...