Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/** The builder class for {@link OperatorAttributes}. */
@PublicEvolving
public class OperatorAttributesBuilder {
    @Nullable private Boolean isOutputOnEOF = null;
    @Nullable private Boolean isOutputOnCheckpoint = null; 
    @Nullable private Boolean hasInternalSorterisInternalSorterSupported = null; 

    public OperatorAttributesBuilder() {...}

    public OperatorAttributesBuilder setIsOutputOnEOF(boolean isOutputOnEOF) {...}

    public OperatorAttributesBuilder setIsOutputOnCheckpoint(boolean isOutputOnCheckpoint) {...}

    public OperatorAttributesBuilder setHasInternalSortersetIsInternalSorterSupported(boolean hasInternalSorterisInternalSorterSupported) {...}

    /**
     * If any operator attribute is null, we will log it at DEBUG level and use the following
     * default values.
     * - isOutputOnEOF defaults to false
     * - isOutputOnCheckpoint defaults to false
     * - hasInternalSorterisInternalSorterSupported defaults to false
     */ 
     public OperatorAttributes build() {...}
}

...

Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize the job performance.
 */
@PublicEvolving
public class OperatorAttributes {     
   /**
     * Returns true iff the operator can only emit records after inputs have reached EOF.
     *
     * <p>Here are the implications when it is true:
     *
     * <ul>
     *   <li>Its output edges are blocking.
     *   <li>It does not support checkpoint.
     * </ul>
     */
    public boolean getIsOutputOnEOF() {...}

    /**
     * Returns true iff the operator can only emit records when checkpoint is triggered.
     *
     * <p>If true, the job should trigger checkpoint in order to flush data to sinks.
     */
    public boolean getIsOutputOnCheckpoint() {...}

    /**
     * Returns true iff the operator sorts data internally when checkpoint can be disabled.
     * 
     * <p>Here are the implications when it is true Checkpoint can be disabled disabled in any of the following cases:
     *
 - execution.runtime-mode == BATCH *execution <ul>mode.
     *   <li>Its - execution.checkpointing.interval-during-backlog == 0.
	 * - isOutputOnEOF == true.
     *
     * <p>Here are the implications when it is true AND checkpoint is disabled:
     *
     * <ul>
     *   <li>Its input records do not need to be sorted externally.
     *   <li>Its managed memory should be set according to execution.sorted-inputs.memory.
     *   <li>It does not support checkpoint when any input has isBacklog=true.
     * </ul>
     */
    public boolean getHasInternalSortergetIsInternalSorterSupported() {...}
}


Note that an operator with internal sorter does not necessarily mean that it only emits data at the end of input.  For example, we might have an operator that sorted data when it is still reading from an input with isBacklog=true. When all the inputs (it is still reading from) have isBacklog=false, the operator can stop sorting and start to emit records continuously in the streaming fashion. And the operator can support checkpointing during this period.

...

Code Block
languagejava
@Internal
public abstract class Transformation<T> {
    public boolean getIsOutputOnEOF() {
        return false;
    }

    public boolean getIsOutputOnCheckpoint() {
        return false;
    }

    public boolean getHasInternalSortergetIsInternalSorterSupported() {
        return false;
    }
}


2) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.

...

  • If a Transformation has IsOutputOnEOF == true:
    • Its output edge is blocking
    • The PipelinedRegion containing this Transformation should never trigger checkpoint.
  • If all Transformation has IsOutputOnCheckpoint == false:
    • In FLIP-325, JM will not trigger an extra flush() before triggering a checkpoint.
  • If a Transformation has HasInternalSorter IsInternalSorterSupported == true AND
    (execution.runtime-mode == BATCH OR execution.checkpointing.interval-during-backlog == 0 OR isOutputOnEOF == true):
    • Flink runtime will not add external sorter for its inputs (including during batch-mode with keyed inputs)
    • Its managed memory should be set according to execution.sorted-inputs.memory
    • The PipelinedRegion containing this Transformation should not trigger checkpoint until all inputs have isBacklog=false.

...

  • Add a subclass of TwoInputStreamOperator that uses an internal sorter to do co-group with pretty much the same implementation as to DataSet#coGroup.
  • This operator should override getOperatorAttributes() to return OperatorAttributes(IsOutputOnEOF=true, HasInternalSorterIsInternalSorterSupported=true).
  • Update CoGroupedStreams#WithWindow#apply to use this operator to do co-group when the user-specified window assigner is EndOfStreamWindows.

...

  • Override getOperatorAttributes to return HasInternalSorter IsInternalSorterSupported = true if the operator needs to use internal sorter in the batch modesort data internally to improve throughput.
  • Override the corresponding processRecordAttributes() API (see FLIP-325) to adjust its behavior based on the input's isBacklog status. For example, the operator can buffer and sort input records when any input has isBacklog=trueany input has isBacklog=true AND execution.checkpointing.interval-during-backlog=0. And once all inputs' status has switch to isBacklog=false, then process the buffered records, emit results, and start to work in the stream execution mode.

...

  • .


Example Usages

1) User wants to co-group records from two bounded streams and emit output after both inputs have ended.

...