Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.flink.streaming.api.operators;
 
/** The builder class for {@link OperatorAttributes}. */
@Experimental
public class OperatorAttributesBuilder {
    @Nullable private Boolean outputOnEOF = null;
    @Nullable private Boolean outputOnCheckpoint = null;
 
    public OperatorAttributesBuilder() {...}
 
    public OperatorAttributesBuilder setOutputOnEOF(boolean outputOnEOF) {...}
 
    public OperatorAttributesBuilder setOutputOnCheckpoint(boolean outputOnCheckpoint) {...}
  
    /**
     * If any operator attribute is null, we will log it at DEBUG level and use the following
     * default values.
     * - outputOnEOF defaults to false
     * - outputOnCheckpoint defaults to false
     */
     public OperatorAttributes build() {...}
}

...

Code Block
languagejava
package org.apache.flink.streaming.api.operators;
 
/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize the job performance.
 */
@Experimental
public class OperatorAttributes {    
   /**
     * Returns true iff the operator can only emit records after inputs have reached EOF.
     *
     * <p>Here are the implications when it is true:
     *
     * <ul>
     *   <li> The results of this operator as well as its upstream operators have blocking partition type.
     *   <li> This operator as well as its upstream operators will be executed in batch mode.
     * </ul>
     */
    public boolean isOutputOnEOF() {...}
 
    /**
     * Returns true iff the operator can only emit records when checkpoint is triggered.
     */
    public boolean isOutputOnCheckpoint() {...}
}}


3) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

...

1) Add the APIs on Transformation interface to get the corresponding operator attributes.

Code Block
languagejava
@Internal
public abstract class Transformation<T> {
 
Code Block
languagejava
@Internal
public abstract class Transformation<T> {
    public boolean isOutputOnEOF() {
        return false;
    }
 
    public boolean isOutputOnCheckpointisOutputOnEOF() {
        return false;
    }
}


2) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.

...

3) Update JM to make use of the following operator attributes when compiling the Transformation graph into the JobGraph.

If a Transformation has isOutputOnEOF == true:

  • The process of operator chain can still be done. After that, the results of its operator chain as well as and its upstream operators will be set blocking (by default) or hybrid shuffle partition type which can be controlled by configuring ExecutionOptions.BATCH_SHUFFLE_MODE.
  • This operator as well as its upstream operators will be executed in batch mode (e.g checkpoint is disabled when these operators are running).
  • If all Transformation has isOutputOnCheckpoint == false:
    • In FLIP-325, JM will not trigger an extra flush() before triggering a checkpoint.


4) A blocking input edge with pending records is same as a source with isBacklog=true when an operator determines its RecordAttributes for downstream nodes.

...

This operator will use the follow following optimization to achieve much higher throughput than the existing DataStream#coGroup in both stream and batch mode:

...