Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposed Changes

1) Add the APIs on Transformation PhysicalTransformation interface to get the corresponding operator attributes.

Code Block
languagejava
@Internal 
public abstract class PhysicalTransformation<T> extends Transformation<T> {
    public boolean isOutputOnEOFisOutputOnlyAfterEndOfStream() {
        return false;
    }
}


2) Update the following Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.:

  • OneInputTransformation
  • TwoInputTransformation
  • AbstractMultipleInputTransformation


3) Update JM to make use of the following operator attributes when compiling the Transformation graph into the JobGraph.

If a Transformation has isOutputOnEOF isOutputOnlyAfterEndOfStream == true:

  • The process of operator chaining can still be done. After that, the results of its operator chain will be set blocking.
  • This operator will be executed in batch mode.

    More specifically, the checkpoint is disabled when these operators are running, such that these operators can do operations not compatible with checkpoints (e.g. sorting inputs). And operators should re-read the data from the upstream blocking edge or sources after failover. 

...

4) Enumerate all existing operators defined in Flink and set the isOutputOnEOF isOutputOnlyAfterEndOfStream accordingly for those operators that may only output at the end of the input to enable the optimization mentioned in 3).:

  • WindowOperator: When used with GlobalWindows as the window assigner and EndOfStreamTrigger as the trigger
  • StreamSortOperator


5) When DataStream API like DataStream#CoGroup is invoked with GlobalWindows and EndOfStreamTrigger as the window assigner and trigger, The the window operator will check if the window assigner is an instance of GlobalWindow and the trigger is an instance of EndOfStreamTrigger. If true, the operator will have OperatorAttributes with isOutputOnEOF isOutputOnlyAfterEndOfStream = true to achieve higher throughput by using the optimizations in batch mode.

...

As mentioned in Proposed Change 4), we will enumerate all the existing operators defined in Flink and set the isOutputOnEOF isOutputOnlyAfterEndOfStream accordingly. For user-defined operators, their behavior stays the same as before. If the user-defined operators only output at the end of input, they have to must set the isOutputOnEOF isOutputOnlyAfterEndOfStream to true to enable the optimization.

...