Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

[This FLIP proposal is a joint work between of Ran Jinhao and Dong Lin  and Xuannan Su ]


Table of Contents

Motivation

...

1) Add the EndOfStreamTrigger in GlobalWindows which allows users of the DataStream API to create GlobalWindows with EndOfStreamTrigger to specify whether the computation (e.g. co-group, aggregate) should emit data only after end-of-input.

Code Block
languagejava
/**
 * A {@link WindowAssigner} that assigns all elements to the same {@link GlobalWindow}.
 *
 * <p>Use this if you want to use a {@link Trigger} and {@link
 * org.apache.flink.streaming.api.windowing.evictors.Evictor} to do flexible, policy based windows.
 */
@PublicEvolving
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
	···    
	private final Trigger<Object, GlobalWindow> defaultTrigger;

    private GlobalWindows(Trigger<Object, GlobalWindow> defaultTrigger) {
        this.defaultTrigger = defaultTrigger;
    }
   ···
   @Override
    public Trigger<Object, GlobalWindow> getDefaultTrigger() {
        return defaultTrigger;
    }
   ··· 
   public static GlobalWindows create() {
        return new GlobalWindows(new NeverTrigger());
    }

    public static GlobalWindows createWithEndOfStreamTrigger() {
        return new GlobalWindows(new EndOfStreamTrigger());
    }
   ··· 
  /** A trigger that fires when input ends. */
    @Internal
    public static class EndOfStreamTrigger extends Trigger<Object, GlobalWindow> {
        private static final long serialVersionUID = 1L;

        @Override
        public TriggerResult onElement(
                Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        
@PublicEvolving
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
   ··· 
   public static GlobalWindows create() {
        return new GlobalWindows(new NeverTrigger());
    }

    public static   @Override
        public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}

        @Override
  GlobalWindows createWithEndOfStreamTrigger() {
      public void onMerge(GlobalWindow window, OnMergeContext ctx) {}return new GlobalWindows(new EndOfStreamTrigger());
    }
	···
}


2) Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that Flink runtime can use to optimize the job performance.

...

3) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

Code Block
languagejava
@Experimental
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...
    @Experimental
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}
 
@Experimental
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...
    @Experimental
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

...

  • The process of operator chain can still be done. After that, the results of its operator chain will be set blocking.
  • This operator will be executed in batch mode.

    More specifically, the checkpoint is disabled when these operators are running, such that these operators can do operations not compatible with checkpoints (e.g. sorting inputs). And operators should re-read the data from the upstream blocking edge or sources after failover. 


4)A blocking input edge with pending records is same as a source with isBacklog=true when an operator determines its RecordAttributes for downstream nodes.

This is needed in order for this FLIP to work with FLIP-327. More specifically, once both FLIP-327 and FLIP-331 are accepted, we need a way to determine the backlog status for input with blocking edge type.

...

5) When DataStream#coGroup is invoked with EndOfStreamWindows GlobalWindows andEndOfStreamTrigger as the window assigner and trigger, Flink should generate an operator with isOutputOnEOF = true.

...

6) When DataStream#aggregate is invoked with GlobalWindows and its default trigger andEndOfStreamTrigger as the window assigner and trigger, Flink should generate an operator with isOutputOnEOF = true.

...

More specifically, This operator will sort the input before aggregation, and avoid invoking window actions, which is similar to '5)'.

7) All existing operators can redefine operator attributes,isOutputOnEOF == true, to use new features metioned in 3).

Analysis of APIs affected by this FLIP

...