Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, supporting the above use-cases requires all operators to run in the stream mode across the entire job lifetime. This approach leads to inferior performance because some operators (e.g. co-group) might be more than 10X slower in stream mode than when it is run in batch mode, due to lack of the capability to do buffering and sorting.

In this FLIP, we propose to optimize performance for the above use-cases by allowing an operator to effectively switch its execution mode from batch to stream based on "backlog" status of the records emitted by the source operators.

NOTE: this FLIP depends on the APIs (e.g. RecordAttributes with isBacklog information) proposed in FLIP-325. While FLIP-325 allows an operator to buffer records for arbitrary long while its input records have isBacklog=true, this FLIP additionally allows an operator to use optimizations that is currently only applicable in the batch execution mode (e.g. using ExternalSorter which does not support checkpoint).

Image Removed

Behavior changes when switching from batch mode to stream mode


Image Added


Behavior changes when switching from batch mode to stream mode

In section, we describe how batch mode and stream mode differ in a variety of aspects. We also describe the new behavior needed for an operator to switch from batch mode to stream mode during execution In section, we describe how batch mode and stream mode differ in a variety of aspects. We also describe the new behavior needed for an operator to switch from batch mode to stream mode during execution of the same job.

For simplicity, we assume the Flink job uses default configuration values for all configurations (e.g. scheduler-mode) except execution.runtime-mode. And the operator reads records from a source which switches from isBacklog=true to isBacklog=false before it reaches EOF.

...

  • Managed memory is allocated to operators with keyed inputs which claims isInternalSorterSupported == true.
  • A general purpose keyed state backend which does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is instantiated by Flink runtime.
  • Before source operator emits isBacklog=false, a downstream operator sorts input records by keys internally without accessing state backend. We will provide utility classes (derived from the existing ExternalSorter) to sort inputs.
  • At the point when isBacklog switches to false:
    - Source operator emits RecordAttributes(isBacklog=false)
    - Downstream operator will process/aggregate all the records (received before RecordAttributes(isBacklog=false)) in order of the sorted keys, using a BatchExecutionKeyedStateBackend which it has explicitly instantiated internally.
    - Downstream operator moves the each (key, state) pair from BatchExecutionKeyedStateBackend to the keyed state backend instantiated by Flink runtime once it sees a record coming in with a different key.
  • The operator continues to process records using the keyed state backend instantiated by Flink runtime, which now contains the full state obtained from records received while isBacklog=true.

Public Interfaces

1) Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that Flink runtime can use to optimize the job performance.RecordAttributesBuilder and RecordAttributes that extends RuntimeEvent to provide operator with essential information about the records they receive, such as whether the records are already stale due to backlog.

In the future, we can further enhance this class to provide additional details that aid in optimizing the operator's performance, such as identifying if the records are insert-only.

NOTE: This FLIP focuses on providing a mechanism for sources to propagate the IsProcessingBacklog information (introduced in FLIP-309) from sources to downstream operators and let operators take advantage of this information, without specifying how the source should determine the value of IsProcessingBacklog. In the future, we expect IsProcessingBacklog can very likely be determined using the following strategies:

  • Based on the source operator's state. For example, when MySQL CDC source is reading snapshot, it can claim isBacklog=true.
  • Based on the watermarkLag in the source. For example, when system_time - watermark > user_specified_threshold, then isBacklog=true.
  • Based on metrics. For example, when busyTimeMsPerSecond (or backPressuredTimeMsPerSecond) > user_specified_threshold, then isBacklog=true.


Code Block
languagejava
/** The builder class for {@link RecordAttributes}. */
@PublicEvolving
public class RecordAttributesBuilder {
    @Nullable private Boolean backlog;
 
    public RecordAttributesBuilder() {
        backlog = null;
    }
 
    public RecordAttributesBuilder setBacklog(boolean backlog) {...} 

    /**
     * If any operator attribute is null, we will log it at DEBUG level and use the following
     * default values.
     * - backlog defaults to false
     */ 
    public RecordAttributes build() {...}
}


Code Block
languagejava
/**
 * A RecordAttributes element provides stream task with information that can be used to optimize
 * the stream task's performance.
 */
@PublicEvolving
public class RecordAttributes extends RuntimeEvent {
    /**
     * If it returns true, then the records received after this element are stale
     * and an operator can optionally buffer records until isBacklog=false. This
     * allows an operator to optimize throughput at the cost of processing latency.
     */
     @Nullable
     public Boolean isBacklog() {...}
}


2) Make RuntimeEvent PublicEvolving and add a method in Output to broadcast RuntimeEvent to downstream operators.

Code Block
languagejava
/** Subclasses of this event are recognized as events exchanged by the core runtime. */
@PublicEvolving
public abstract class RuntimeEvent extends AbstractEvent {}


Code Block
languagejava
@PublicEvolving
public interface Output<T> extends Collector<T> {
    ...

    /**		
     * Emits a {@link RuntimeEvent} from an operator. This element is broadcast to all
     * downstream operators.
     */
    default void emitRuntimeEvent(RuntimeEvent runtimeEvent) {
      throw new UnsupportedOperationException();
    }
}


NOTE: It is up to the operator implementation to decide when (and how often) to emit RuntimeEvent. The overhead of emitting RuntimeEvent is similar to the overhead of emitting Watermark.


3) Add methods in Input and TwoInputStreamOperator to process RecordAttributes.

Code Block
languagejava
@PublicEvolving
public interface Input<IN> {
    ...

    /**
     * Processes a {@link RecordAttributes} that arrived on this input.
     * This method is guaranteed to not be called concurrently with other methods of the operator.
     *
     * If this method returns true, Flink runtime will automatically broadcast the given recordAttributes
     * to downstream operators. Otherwise, the operator should explicitly broadcast RecordAttributes
     * via `Output#emitRuntimeEvent`.
     */
    default boolean processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
       return true;
    }
}


Code Block
languagejava
@PublicEvolving
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {
    ...

    /**
     * Processes a {@link RecordAttributes} that arrived on the first input of this operator.
     * This method is guaranteed to not be called concurrently with other methods of the operator.
     *
     * If this method returns true, Flink runtime will determine whether to broadcast a RecordAttributes
     * to downstream operators as well as its value. Otherwise, the operator should explicitly broadcast 
     * RecordAttributes via `Output#emitRuntimeEvent`.
     */
    default boolean processRecordAttributes1(RecordAttributes recordAttributes) throws Exception {
      return true;
    }

    /**
     * Processes a {@link RecordAttributes} that arrived on the second input of this operator.
     * This method is guaranteed to not be called concurrently with other methods of the operator.
     *
     * If this method returns true, Flink runtime will determine whether to broadcast a RecordAttributes
     * to downstream operators as well as its value. Otherwise, the operator should explicitly broadcast 
     * RecordAttributes via `Output#emitRuntimeEvent`. 
     */
    default boolean processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {
       return true;
    }
}


NOTE: An operator should take care to enforce the processing latency requirement for the records it has received when isBacklog=false. In particular, the operator should flush/emit buffered records (inside `#processRecordAttributes`) if backlog status of the input switches from false to true, so that we do not trap records that should be processed with low latency.


4) Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that Flink JM can use to properly compile the graph (e.g. whether to add external sorter to sort input records by key).


Code Block
languagejava
package org.apache.flink.streaming.api.
Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/** The builder class for {@link OperatorAttributes}. */
@PublicEvolving
public class OperatorAttributesBuilder {
    @Nullable private Boolean internalSorterSupported = null; 

    public OperatorAttributesBuilder() {...}

    public OperatorAttributesBuilder setInternalSorterSupported(boolean internalSorterSupported) {...}

    /**
     * If any operator attribute is null, we will log it at DEBUG level and use the following
     * default values.
     * - internalSorterSupported defaults to false
     */ 
     public public OperatorAttributes build() {...}
}

...

Note that an operator with internal sorter does not necessarily mean that it only emits data at the end of input.  For example, we might have an operator that sorted data when it is still reading from an input with isBacklog=true. When all the inputs (it is still reading from) have isBacklog=false, the operator can stop sorting and start to emit records continuously in the streaming fashion. And the operator can support checkpointing during this period.

2) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

. And the operator can support checkpointing during this period.


5) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

Code Block
languagejava
@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

Code Block
languagejava
@PublicEvolving
public interface StreamOperator<OUT>StreamOperatorFactory<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

Proposed Changes


Proposed Changes

1) Update Flink runtime to broadcast RecordAttributes if #processRecordAttributes  returns true.

The following strategy is used:

  • If all inputs have isBacklog=false, emits RecordAttributes(isBacklog=false)
  • Otherwise, if all inputs have isBacklog=true, emit RecordAttributes(isBacklog=true)
  • Otherwise, log it at DEBUG level and emit RecordAttributes(isBacklog=true).

This effectively allows an operator to buffer records if any of its upstream source operators claims isProcessingBacklog=true. 

We will add DEBUG level log to help users locate the two-input operators that do not explicitly override its processRecordAttributes methods.


21) Add the APIs on Transformation interface to get the corresponding operator attributes.

Code Block
languagejava
@Internal
public abstract class Transformation<T> {

    public boolean isInternalSorterSupported() {
        return false;
    }
}


23) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.


34) Update JM to make use of the following operator attributes when compiling the Transformation graph into the JobGraph.

  • If a Transformation has IsInternalSorterSupported == true AND
    (execution.runtime-mode == BATCH OR execution.checkpointing.interval-during-backlog == 0):
    • Flink runtime will not add external sorter for its inputs (including during batch-mode with keyed inputs)
    • Its managed memory should be set according to execution.sorted-inputs.memory
    • batch-mode with keyed inputs)
    • Its managed memory should be set according to execution.sorted-inputs.memory


5) For those operators whose throughput can be considerably improved by buffering records, update it to override processRecordAttributes() as appropriate.

We anticipate that operators heavily relying on state backend operations, such as join, co-group, and aggregation operators, can benefit from buffering records. For example, by buffering, sorting and aggregating records in memory before accessing the state backend, we can significantly reduce the average number of state backend accesses required to process a record.


64) For those operators whose throughput can be considerably improved with an internal sorter, update it to take advantage of the internal sorter when its input has isBacklog=true.

...