Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

[This FLIP proposal is a joint work of Ran JinhaoDong Lin and Xuannan Su ]


Table of Contents

Motivation

As shown in the figure below, we might have a job that pre-processes records from a bounded source (i.e. inputA) using an operator (i.e. operatorA) which only emits results after its input has ended, which is also known as dam operator. Then operatorB needs to join records emitted by operatorA with records from an unbounded source, and emit results with low processing latency in real-time.

...

In addition, this FLIP also adds EndOfStreamTrigger in GlobalWindows. GlobalWindows with EndOfStreamTrigger can be used with the DataStream API to specify whether the window will end only at the end of its inputs. DataStream programs (e.g. coGroup and aggregate) can take advantage of this information to significantly increase throughput and reduce resource utilization.


Objectives

The APIs provided in this FLIP achieve the following objectives / benefits:

...

3) Introduce attributes to the operator to let Flink know if the operator only outputs results when the input has ended. All the operators defined in Flink should set the attributes accordingly to achieve the benefit above. User-defined operators should be able to set the attributes accordingly so that they can achieve the benefit.


Public Interfaces

1) Add the createWithEndOfStreamTrigger method in GlobalWindows which allows users of the DataStream API to create GlobalWindows with a window trigger that only fires when the input ends.

...

Code Block
languagejava
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...
	
    /**
     * Get the {@link OperatorAttributes} of the operator that the Flink runtime can use to optimize
     * the job performance.
     *
     * @return OperatorAttributes of the operator.
     */
    @Experimental
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}
 

public interface StreamOperatorFactory<OUT> extends Serializable {
    ...

    /**
     * Get the {@link OperatorAttributes} of the operator created by the factory. Flink runtime can
     * use it to optimize the job performance.
     *
     * @return OperatorAttributes of the operator created by the factory.
     */
    @Experimental
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}


Proposed Changes

1) Add the APIs on PhysicalTransformation interface to get the corresponding operator attributes.

...

5) When DataStream API like DataStream#CoGroup is invoked with GlobalWindows and EndOfStreamTrigger as the window assigner and trigger, the window operator WindowOperator will check if the window assigner is an instance of GlobalWindow and the trigger is an instance of EndOfStreamTrigger. If true, the operator will have OperatorAttributes with isOutputOnlyAfterEndOfStream = true to achieve higher throughput by using the optimizations in batch mode.

Analysis of APIs affected by this FLIP

This FLIP can potentially benefit all DataStream APIs that take WindowAssigner as the parameter.

...

  • DataStream#windowAll
  • KeyedStream#window
  • CoGroupedStreams#Where#EqualTo#window
  • JoinedStreams#Where#EqualTo#window

Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.

...

As mentioned in Proposed Change 4), we will enumerate all the existing operators defined in Flink and set the isOutputOnlyAfterEndOfStream accordingly. For user-defined operators, their behavior stays the same as before. If the user-defined operators only output at the end of input, they must set the isOutputOnlyAfterEndOfStream to true to enable the optimization.

Future Work

  • It would be useful to add an ExecutionState (e.g. Finishing) to specify whether the task has reached EOF the end for all its inputs. This allows JM to deploy its downstream tasks and possibly apply hybrid shuffle to increase job throughput.
  • Setting the upstream operators of the dam operator in batch mode will also increase the job throughput, and Hybrid shuffle mode can also be used in batch mode part to further improve the performance when there are sufficient slot resources.
  • Optimizing the implementation of frequently used dam operators,  such as aggregate/CoGroup/Join/..., can achieve higher throughput by using the optimizations currently done in batch mode. For example, we can instantiate an internal sorter in the operator, and it will not have to invoke WindowAssigner#assignWindows or triggerContext#onElement for each input record.

...