Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/** The builder class for {@link OperatorAttributes}. */
@PublicEvolving
public class OperatorAttributesBuilder {
    @Nullable private Boolean isOutputOnEOFoutputOnEOF = null;
    @Nullable private Boolean isOutputOnCheckpointoutputOnCheckpoint = null; 
    @Nullable private Boolean isInternalSorterSupportedinternalSorterSupported = null; 

    public OperatorAttributesBuilder() {...}

    public OperatorAttributesBuilder setIsOutputOnEOFsetOutputOnEOF(boolean isOutputOnEOFoutputOnEOF) {...}

    public OperatorAttributesBuilder setIsOutputOnCheckpointsetOutputOnCheckpoint(boolean isOutputOnCheckpointoutputOnCheckpoint) {...}

    public OperatorAttributesBuilder setIsInternalSorterSupportedsetInternalSorterSupported(boolean isInternalSorterSupportedinternalSorterSupported) {...}

    /**
     * If any operator attribute is null, we will log it at DEBUG level and use the following
     * default values.
     * - isOutputOnEOFoutputOnEOF defaults to false
     * - isOutputOnCheckpointoutputOnCheckpoint defaults to false
     * - isInternalSorterSupportedinternalSorterSupported defaults to false
     */ 
     public OperatorAttributes build() {...}
}

...

Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize the job performance.
 */
@PublicEvolving
public class OperatorAttributes {     
   /**
     * Returns true iff the operator can only emit records after inputs have reached EOF.
     *
     * <p>Here are the implications when it is true:
     *
     * <ul>
     *   <li>Its output edges are blocking.
     *   <li>The PipelinedRegion containing this operator has checkpoint disabled.
     * </ul>
     */
    public boolean getIsOutputOnEOFisOutputOnEOF() {...}

    /**
     * Returns true iff the operator can only emit records when checkpoint is triggered.
     *
     * <p>If true, the job should trigger checkpoint in order to flush data to sinks.
     */
    public boolean getIsOutputOnCheckpointisOutputOnCheckpoint() {...}

    /**
     * Returns true iff the operator uses an internal sorter when checkpoint is disabled.
     * 
     * <p>Checkpoint is disabled disabled in any of the following cases:
     *
     * <ul>
     *   <li>execution.runtime-mode = BATCH.
     *   <li>execution.checkpointing.interval-during-backlog = 0.
	 *   <li>It has isOutputOnEOF = true.
     * <ul>
     *
     * <p>Here are the implications when it is true AND checkpoint is disabled:
     *
     * <ul>
     *   <li>Its input records do not need to be sorted externally.
     *   <li>Its managed memory should be set according to execution.sorted-inputs.memory.
     * </ul>
     */
    public boolean getIsInternalSorterSupportedisInternalSorterSupported() {...}
}


Note that an operator with internal sorter does not necessarily mean that it only emits data at the end of input.  For example, we might have an operator that sorted data when it is still reading from an input with isBacklog=true. When all the inputs (it is still reading from) have isBacklog=false, the operator can stop sorting and start to emit records continuously in the streaming fashion. And the operator can support checkpointing during this period.

...

Code Block
languagejava
@Internal
public abstract class Transformation<T> {
    public boolean getIsOutputOnEOFisOutputOnEOF() {
        return false;
    }

    public boolean getIsOutputOnCheckpointisOutputOnCheckpoint() {
        return false;
    }

    public boolean getIsInternalSorterSupportedisInternalSorterSupported() {
        return false;
    }
}


2) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.

...