Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize job performance.
 */
@PublicEvolving
public class OperatorAttributes {     
   /**
     * Returns true iff the operator can only emit records after inputs have reached EOF.
     *
     * <p>Here are the implications when it is true:
     *
     * <ul>
     *   <li>Its output edges are blocking.
     * </ul>
     */
    public boolean getIsOutputOnEOF() {...}

    /**
     * Returns true iff the operator can only emit records when checkpoint is triggered.
     *
     * <p>If true, the job should trigger checkpoint in order to flush data to sinks.
     */
    public boolean getIsOutputOnCheckpoint() {...}

    /**
     * Returns true iff the operator sorts data internally.
     *
     * <p>Here are the implications when it is true:
     *
     * <ul>
     *   <li>Its checkpoint is disabled.
     *   <li>Its input records do not need to be sorted externally.
     *   <li>Its managed memory should be set according to execution.sorted-inputs.memory.
     * </ul>
     */
    public boolean getHasInternalSorter() {...}
}


Note that an operator with internal sorter does not necessarily mean that it only emits data at the end of input.  For example, we might have an operator that sorted data when it is still reading from an input with isBacklog=true. When all the inputs (it is still reading from) have isBacklog=false, the operator can stop sorting and start to emit records continuously in the streaming fashion.


3) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

...

2) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.


3) Update the algorithm used by JM to determine whether operators in a PipelinedRegion enables checkpoint.

...

the the CheckpointCoordinator

If any ExecutionVertex is reading from or writing to a blocking edge, then the checkpoint is disabled during this period.

If any operator reports getHasInternalSorter == true, then the checkpoint is disabled

...

when this operator is running from a source with isBacklog=true.


4

4) CheckpointCoordinator will trigger checkpoint only after all ExecutionVertexes have finished reading the blocking input edges. 

  • The JM should keep track of the number of ExecutionVertexes which have blocking input edges that have not been fully read.
  • Each ExecutionVertex should notify JM after it finishes reading all its blocking input edges.
  • The JM should enable checkpoint only after all ExecutionVertexes have finished reading the blocking input edges.
  • Records buffered in the upstream node due to blocking edges can be removed after its downstream nodes have made the first checkpoint. 

5) A blocking input edge with pending records is same as a source with isBacklog=true when an operator determines its RecordAttributes for downstream nodes.

...