Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize the job performance.
 */
@PublicEvolving
public class OperatorAttributes {     
   /**
     * Returns true iff the operator can only emit records after inputs have reached EOF.
     *
     * <p>Here are the implications when it is true:
     *
     * <ul>
     *   <li>Its output edges are blocking.
     *   <li>It does not support checkpoint<li>The PipelinedRegion containing this operator has checkpoint disabled.
     * </ul>
     */
    public boolean getIsOutputOnEOF() {...}

    /**
     * Returns true iff the operator can only emit records when checkpoint is triggered.
     *
     * <p>If true, the job should trigger checkpoint in order to flush data to sinks.
     */
    public boolean getIsOutputOnCheckpoint() {...}

    /**
     * Returns true iff the operator sortsuses an datainternal internallysorter when checkpoint can beis disabled.
     * 
     * Checkpoint<p>Checkpoint can beis disabled disabled in any of the following cases:
     *
     * <ul>
     * -  execution<li>execution.runtime-mode == BATCH execution mode.
     * -  execution<li>execution.checkpointing.interval-during-backlog == 0.
	 *   -<li>It has isOutputOnEOF == true.
     * <ul>
     *
     * <p>Here are the implications when it is true AND checkpoint is disabled:
     *
     * <ul>
     *   <li>Its input records do not need to be sorted externally.
     *   <li>Its managed memory should be set according to execution.sorted-inputs.memory.
     *   <li>It does not support checkpoint when any input has isBacklog=true to execution.sorted-inputs.memory.
     * </ul>
     */
    public boolean getIsInternalSorterSupported() {...}
}

...

6) For those stream-only operators whose throughput is observably slower than the corresponding batch-only operator, add the corresponding update it to be stream-batch unified operatorsuch that it can use batch-like optimizations when inputs has isBacklog=true.

Typically, operations that have keyed inputs and involve aggregation operator (e.g. join, cogroup, aggregate) can benefit from this optimization.

Though the concrete implementation may be vary across operators, typically an operator needs the following changes to be stream-batch unified:

  • Override getOperatorAttributes to return IsInternalSorterSupported = true if the operator needs to use sort data internally to improve throughput.
  • Override the corresponding processRecordAttributes() API (see FLIP-325) to adjust its behavior based on the input's isBacklog status.
    For example, the operator can buffer and sort input records when any input has isBacklog=true AND it is sure the checkpoint will not be triggered (e.g. execution.checkpointing.interval-during-backlog=0). And once all inputs' status has switch to isBacklog=false, then process it processes the buffered records, emit emits results, and start starts to work in the stream execution mode.

...