Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

3) Improve usability. For use-case that needs to invoke DataStream APIs (e.g. KeyedStream#window) with a window assigner that covers all input data, users can use the off-the-shelf GlobalWindows with new default trigger EndOfStreamTrigger provided in this FLIP, instead of writing tens of lines of code to define this WindowAssigner subclass.  Existing operators can also redefine operator attributes to achieve this objective.


Public Interfaces

1) Add the EndOfStreamTrigger in GlobalWindows which allows users of the DataStream API to create GlobalWindows with EndOfStreamTrigger to specify whether the computation (e.g. co-group, aggregate) should emit data only after end-of-input.

Code Block
languagejava

@PublicEvolving
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
   ··· 
   public static GlobalWindows create() {
        return new GlobalWindows(new NeverTrigger());
    }

    public static GlobalWindows createWithEndOfStreamTrigger() {
        return new GlobalWindows(new EndOfStreamTrigger());
    }
}

...

3) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

Code Block
languagejava

public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...
    @Experimental
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}
 

public interface StreamOperatorFactory<OUT> extends Serializable {
    ...
    @Experimental
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

...

  • The process of operator chain can still be done. After that, the results of its operator chain will be set blocking.
  • This operator will be executed in batch mode.

    More specifically, the checkpoint is disabled when these operators are running, such that these operators can do operations not compatible with checkpoints (e.g. sorting inputs). And operators should re-read the data from the upstream blocking edge or sources after failover. 


4) A blocking input edge with pending records is same as a source with isBacklog=true when an operator determines its RecordAttributes for downstream nodes.

This is needed in order for this FLIP to work with FLIP-327. More specifically, once both FLIP-327 and FLIP-331 are accepted, we need a way to determine the backlog status for input with blocking edge type.

All existing operators can redefine operator attributes,isOutputOnEOF == true, to use new features metioned in 3).


5) When DataStream API like DataStream#CoGroup 5) When DataStream#coGroup is invoked with GlobalWindows andEndOfStreamTrigger as the window assigner and trigger, Flink should generate an operator with isOutputOnEOF = true.

In addition, after FLIP-327 is accepted, suppose this API is called without any explicitly-assigned trigger or evictor, the program should also have isInternalSorterSupported = true to achieve higher throughput by using the optimizations currently done in batch mode.

The following optimization will be used to achieve higher throughput than the existing DataStream#coGroup in both stream and batch mode:

  • It will instantiate two internal sorter to sorts records from its two inputs separately. Then it can pull the sorted records from these two sorters. This can be done without wrapping input records with TaggedUnion<...>. In comparison, the existing DataStream#coGroup needs to wrap input records with TaggedUnion<...> before sorting them using one external sorter, which introduces higher overhead.
  • It will not invoke WindowAssigner#assignWindows or triggerContext#onElement for input records. In comparison, the existing WindowOperator#processElement invokes these methods for every records.

6) When DataStream#aggregate is invoked with GlobalWindows andEndOfStreamTrigger as the window assigner and trigger, Flink should generate an operator with isOutputOnEOF = true.

In addition, after FLIP-327 is accepted, suppose this API is called without any explicitly-assigned trigger or evictor, the program should also have isInternalSorterSupported = true to achieve higher throughput by using the optimizations currently done in batch mode.

More specifically, This operator will sort the input before aggregation, and avoid invoking window actions, which is similar to '5)'.

7) All existing operators can redefine operator attributes,isOutputOnEOF == true, to use new features metioned in 3).

Analysis of APIs affected by this FLIP

...

More specifically, the following DataStream API can benefit from using GlobalWindows with default trigger EndOfStreamTrigger.

  • DataStream#windowAll
  • KeyedStream#window
  • CoGroupedStreams#Where#EqualTo#window
  • JoinedStreams#Where#EqualTo#window

...

The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.

Future Work

  • It would be useful to add an ExecutionState (e.g. Finishing) to specify whether the task has reached EOF for all its inputs. This allows JM to deploy its downstream tasks and possibly apply hybrid shuffle to increase job throughput.

...

  • Setting the upstream operators of EOF operator in batch mode will also increase the job throughput, and Hybrid shuffle mode can also be used in batch mode part to further improve the performance when there are sufficient slot resources.
  • Developing frequently used EOF operator,  such as aggregate/CoGroup/Join/..., can achieve higher throughput by using the optimizations currently done in batch mode. We can instantiate internal sorter in the operator, and flink will not invoke WindowAssigner#assignWindows or triggerContext#onElement for each input record to reduce overhead.