Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Currently, Flink allows the same operator to be used in both stream mode and batch mode. The operator can process bounded stream of records with high throughput (e.g. via sorting/buffering) in a batch-mode job, and process unbounded stream of records with low processing latency (e.g. via checkpoint with stateful state backend) in a batch-mode job. However, the operator is not able to use different modes (i.e. batch and stream modes) at different stages of the same job, which makes it hard to meet the performance requirement for jobs that need to process a bounded stream of backlog data followed by an unbounded stream of fresh data.

The bounded/unbounded streams of records can either come from the same source, or come from different sources, as shown in following examples:

1) We might have a job which aggregates records from a HybridSource (composed of FileSource and KafkaSource) and emit the results to Hudi Sink. When the job is processing records from the FileSource, user needs the job to maximize throughput without having to emit intermediate results with low processing latency. When the job is processing records from the KafkaSource, user needs the job to emit results with low processing latency.

2) As shown in the figure below, we might have a job which bootstraps its state (operatorA and operatorB) using records from a bounded source (i.e. inputA). There is no need to emit any intermediate result when the job is processing records from the bounded source. After all records from the bounded source have been processed by operatorA and operatorB, the job needs to process records from the unbounded source and emit results with low processing latency in real-time.

Currently, supporting the above use-cases requires all operators to run in the stream mode across the entire job lifetime. This approach leads to inferior performance because some operators (e.g. co-group) might be more than 10X slower in stream mode than when it is run in batch mode, due to lack of the capability to do buffering and sorting.

ASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-33202

Release-

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

[This FLIP proposal is a joint work between Xuannan Su  and Dong Lin ]

Table of Contents

Motivation

Currently, Flink allows the same operator to be used in both stream mode and batch mode. The operator can process bounded stream of records with high throughput (e.g. via sorting/buffering) in a batch-mode job, and process unbounded stream of records with low processing latency (e.g. via checkpoint with stateful state backend) in a batch-mode job. However, the operator is not able to use different modes (i.e. batch and stream modes) at different stages of the same job, which makes it hard to meet the performance requirement for jobs that need to process a bounded stream of backlog data followed by an unbounded stream of fresh data.

The bounded/unbounded streams of records can either come from the same source or come from different sources, as shown in the following examples:

1) We might have a job that aggregates records from a HybridSource (composed of FileSource and KafkaSource) and emits the results to Hudi Sink. When the job is processing records from the FileSource, user needs the job to maximize throughput without having to emit intermediate results with low processing latency. When the job is processing records from the KafkaSource, user needs the job to emit results with low processing latency.

2) As shown in the figure below, we might have a job that bootstraps its state (operatorA and operatorB) using records from a bounded source (i.e. inputA). There is no need to emit any intermediate result when the job is processing records from the bounded source. After all records from the bounded source have been processed by operatorA and operatorB, the job needs to process records from the unbounded source and emit results with low processing latency in real-time.

Currently, supporting the above use-cases requires all operators to run in the stream mode across the entire job lifetime. This approach leads to inferior performance because some operators (e.g. co-group) might be more than 10X slower in stream mode than when it is run in batch mode, due to lack of the capability to do buffering and sorting.

In this FLIP, we propose to optimize performance for the above use-cases by allowing an operator to effectively switch its execution mode from batch to stream based on the "backlog" status of the records emitted by the source operators.


NOTE: This FLIP focuses only on the capability to switch from batch to stream mode. If there is any extra API needed to support switching from stream to batch mode, we will discuss them in a follow-up FLIP.


Image AddedImage Removed


Behavior changes when switching from batch mode to stream mode

In this section, we describe how batch mode and stream mode differ in a variety of aspects. We also describe the new behavior needed for of Flink runtime which supports an operator to switch from batch mode to stream mode during execution of the same job.


For simplicityeasy of understanding, we assume the Flink use a simplified use-case with the following properties when describing the behavior changes:

  • The job uses default configuration values for all configurations (e.g. scheduler-mode) except execution.runtime-mode

...

  • (see below).
  • The job has one source that switches from isBacklog=true to isBacklog=false before it reaches EOF.

1) Scheduling strategy

Batch mode:

  • A task (which contains multiple co-located list of chained operators) are deployed after all its upstream tasks have finished.

Stream mode:

  • All tasks are deployed at the beginning of the job without waiting for upstream tasks to finish.

In mixed mode (where the job runs operators in batch mode before switching to stream mode), the same scheduling strategy as stream mode will be used to schedule tasks.

The capability to update/optimize scheduling (e.g. speculative execution, AQE) in mixed mode will be left to future work.

2) Shuffle strategy

Batch mode

  • All shuffle type will be set to BLOCKING.
  • All keyed input will be sorted before they are fed into operators. Managed memory is allocated to operators with keyed inputs to support sorting inputs.

Stream mode:

  • All shuffle type will be set to PIPELINED.

In mixed mode, the same shuffle strategy as stream mode will be used to set shuffle type between tasks. Keyed inputs will not be automatically sorted.

The capability to automatically sort inputs while isBacklog=true will be left to future work.

3) Watermark strategy

Batch mode:

  • Source operator does not emit watermark while it has not reached EOF.
  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE for one key at a time after it has emitted all records for that key. That means the downstream operator might receive watermark=Long.MAX_VALUE for keyA, and then process a record for keyB.

Stream mode:

  • Source operator emits watermark based on the user-specified WatermarkStrategy and pipeline.auto-watermark-interval while it has not reached EOF.
  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records. That means the downstream operator will not see any further input records after having received watermark=Long.MAX_VALUE.

Mixed mode with execution.checkpointing.interval-during-backlog > 0:

  • Same as stream mode.

Mixed mode with execution.checkpointing.interval-during-backlog = 0:

  • Source operator does not emit watermark while isBacklog=true.
  • Before source operator emits isBacklog=false, it emits watermark of Long.MAX_VALUE for one key at a time after it has emitted all records for that key.
  • At the point when isBacklog switches to false, source operator emits RecordAttributes(isBacklog=false).
  • Source operator emits watermark based on the user-specified WatermarkStrategy and pipeline.auto-watermark-interval while it has not reached EOF.
  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records.

As we can see in mixed mode, the watermark behavior effectively switches from the batch mode to stream mode as the source's isBacklog switches from true to false.

4) Checkpoint and failover strategy

Batch mode:

  • No operator needs to support checkpoint. Checkpoint triggering is disabled.
  • If any task fails, this task is restarted to re-process its input from the beginning.

Stream mode:

  • Every operator needs to support checkpoint. Checkpoint is triggered periodically according to execution.checkpointing.interval.
  • If any task fails, its pipelined region is restarted to re-process its input since the last successful checkpoint.

Mixed mode with execution.checkpointing.interval-during-backlog > 0:

  • Same as stream mode.

Mixed mode with execution.checkpointing.interval-during-backlog = 0:


Here are the definition of batch mode, stream mode and mixed mode used below:

  • Batch mode refers to the behavior of Flink runtime prior to this FLIP with execution.runtime-mode = batch.
  • Stream mode refers to the behavior of Flink runtime prior to this FLIP with execution.runtime-mode = streaming.
  • Mixed mode refers to the behavior of Flink runtime after this FLIP with execution.runtime-mode = streaming AND execution.checkpointing.interval-during-backlog = 0.


And we make the following extra notes for the behavior changes:

  • After this FLIP, the behavior of Flink runtime with execution.runtime-mode = streaming AND execution.checkpointing.interval-during-backlog > 0, will be same as the stream mode prior to this FLIP.
  • The rational for switching the Flink runtime behavior based on whether execution.checkpointing.interval-during-backlog == 0 is that most (if not all) performance optimizations, which allow batch mode to achieve higher throughput than stream mode, involve operations (e.g. sorting inputs) that can only be used when checkpoint is not required.
  • It is possible for mixed mode to be slower than stream mode, particularly when there is only small amount of input records and the overhead of buffering/sorting inputs out-weight its benefit. This is similar to how the merge join might be slower than hash join. This FLIP focuses on optimizing the Flink throughput when there is a high number of input records. In the future, we might introduce more strategies to turn on mix mode in a smart way to avoid performance regression.
  • For an operator with 2+ inputs, where some inputs have isBacklog=true and some other inputs have isBacklog=false, Flink runtime will handle this operator as if all its inputs have isBacklog=false. The rational is that we don't have a reliable way (yet) to decide whether it is OK to delay the processing/emission of this operator's output records. For example, if this operator simply forwards the records from the input with backlog=false to its output, then we probably should not delay its output records.


1) Scheduling strategy

Batch mode:

  • A task (which contains multiple co-located list of chained operators) are deployed after all its upstream tasks have finished.

Stream mode:

  • All tasks are deployed at the beginning of the job without waiting for upstream tasks to finish.

Mixed mode:

  • Same as stream mode.

The capability to update/optimize scheduling (e.g. speculative execution, AQE) in mixed mode will be left to future work.


2) Shuffle strategy

Batch mode

  • All shuffle type will be set to BLOCKING.
  • All keyed input will be sorted before they are fed into operators. Managed memory is allocated to operators with keyed inputs to support sorting inputs.

Stream mode:

  • All shuffle type will be set to PIPELINED.

Mixed mode:

  • In mixed mode, the same shuffle strategy as stream mode will be used to set shuffle type between tasks.
  • Before source operator emits isBacklog=false, keyed input of one input operator will be automatically sorted during isBacklog=true if it doesn't sort the input internally.
    • Keyed inputs of multiple inputs operator are not automatically sorted. It can sort the inputs internally.
  • When isBacklog switches to false, the keyed inputs will not be sorted.


3) Watermark strategy

Batch mode:

  • Source operator does not emit watermark.

  • Once an operator reaches EOF, the operator triggers all the timers one key at a time.

Stream mode:

  • Source operator emits watermark based on the user-specified WatermarkStrategy and pipeline.auto-watermark-interval while it has not reached EOF.
  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records. That means the downstream operator will not see any further input records after having received watermark=Long.MAX_VALUE.

Mixed mode:

  • Source operator does not emit watermark while isBacklog=true.
  • At the point when isBacklog switches to false, source operator emits RecordAttributes(isBacklog=false) and the greatest watermark during backlog processing.

  • Source operator emits watermark based on the user-specified WatermarkStrategy and pipeline.auto-watermark-interval while it has not reached EOF.

  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records.


4) Checkpoint and failover strategy

Batch mode:

  • No operator needs to support checkpoint. Checkpoint
  • Every operator needs to support checkpoint.
  • Before source operator emits isBacklog=false, checkpoint triggering is disabled.
  • If any task fails when isBacklog=true, this task is restarted to re-process its input from the beginning.

Stream mode:

  • Every operator needs to support checkpoint.
  • At the point when isBacklog switches to false, source operator emits RecordAttributes(isBacklog=false) and triggers an immediate checkpoint.
  • Checkpoint is triggered periodically according to execution.checkpointing.interval.
  • If any task fails when isBacklog=false, its pipelined region is restarted to re-process its input since the last successful checkpoint.

5) Keyed state backend

Batch Mixed mode:

  • BatchExecutionKeyedStateBackend is used. This keyed state backend leverages the fact that the input records are all sorted by key and discards the state of the current key once it sees a record coming in with a different key.

Stream mode:

  • Managed memory is allocated to operators if the keyed state backend is a subclass of AbstractManagedMemoryStateBackend.
  • A general purpose keyed state backend which does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is used.

Mixed mode with execution.checkpointing.interval-during-backlog > 0:

  • Same as stream mode.
  • Every operator needs to support checkpoint.
  • Before source operator emits isBacklog=false, checkpoint triggering is disabled.
  • If any task fails when isBacklog=true, this task is restarted to re-process its input from the beginning.
  • At the point when isBacklog switches to false, source operator emits RecordAttributes(isBacklog=false) and triggers an immediate checkpoint.
  • Checkpoint is triggered periodically according to execution.checkpointing.interval.
  • If any task fails when isBacklog=false, its pipelined region is restarted to re-process its input since the last successful checkpoint.


Extra notes: For jobs with multiple sources and Mixed mode with execution.checkpointing.interval-during-backlog = 0:

  • Managed memory is allocated to operators with keyed inputs which claims isInternalSorterSupported == true.
  • A general purpose keyed state backend which does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is instantiated by Flink runtime.
  • Before source operator emits isBacklog=false, a downstream operator sorts input records by keys internally without accessing state backend. We will provide utility classes (derived from the existing ExternalSorter) to sort inputs.
  • At the point when isBacklog switches to false:
    - Source operator emits RecordAttributes(isBacklog=false)
    - Downstream operator will process/aggregate all the records (received before RecordAttributes(isBacklog=false)) in order of the sorted keys, using a BatchExecutionKeyedStateBackend which it has explicitly instantiated internally.
    - Downstream operator moves the each (key, state) pair from BatchExecutionKeyedStateBackend to the keyed state backend instantiated by Flink runtime once it sees a record coming in with a different key.
  • The operator continues to process records using the keyed state backend instantiated by Flink runtime, which now contains the full state obtained from records received while isBacklog=true.

Public Interfaces

1) Add RecordAttributesBuilder and RecordAttributes that extends RuntimeEvent to provide operator with essential information about the records they receive, such as whether the records are already stale due to backlog.

In the future, we can further enhance this class to provide additional details that aid in optimizing the operator's performance, such as identifying if the records are insert-only.

NOTE: This FLIP focuses on providing a mechanism for sources to propagate the IsProcessingBacklog information (introduced in FLIP-309) from sources to downstream operators and let operators take advantage of this information, without specifying how the source should determine the value of IsProcessingBacklog. In the future, we expect IsProcessingBacklog can very likely be determined using the following strategies:

  • Based on the source operator's state. For example, when MySQL CDC source is reading snapshot, it can claim isBacklog=true.
  • Based on the watermarkLag in the source. For example, when system_time - watermark > user_specified_threshold, then isBacklog=true.
  • Based on metrics. For example, when busyTimeMsPerSecond (or backPressuredTimeMsPerSecond) > user_specified_threshold, then isBacklog=true.
Code Block
languagejava
/** The builder class for {@link RecordAttributes}. */
@PublicEvolving
public class RecordAttributesBuilder {
    @Nullable private Boolean backlog;
 
    public RecordAttributesBuilder() {
        backlog = null;
    }
 
    public RecordAttributesBuilder setBacklog(boolean backlog) {...} 

    /**
     * If any operator attribute is null, we will log it at DEBUG level and use the following
     * default values.
     * - backlog defaults to false
     */ 
    public RecordAttributes build() {...}
}
Code Block
languagejava
/**
 * A RecordAttributes element provides stream task with information that can be used to optimize
 * the stream task's performance.
 */
@PublicEvolving
public class RecordAttributes extends RuntimeEvent {
    /**
     * If it returns true, then the records received after this element are stale
     * and an operator can optionally buffer records until isBacklog=false. This
     * allows an operator to optimize throughput at the cost of processing latency.
     */
     @Nullable
     public Boolean isBacklog() {...}
}

2) Make RuntimeEvent PublicEvolving and add a method in Output to broadcast RuntimeEvent to downstream operators.

Code Block
languagejava
/** Subclasses of this event are recognized as events exchanged by the core runtime. */
@PublicEvolving
public abstract class RuntimeEvent extends AbstractEvent {}
Code Block
languagejava
@PublicEvolving
public interface Output<T> extends Collector<T> {
    ...

    /**		
     * Emits a {@link RuntimeEvent} from an operator. This element is broadcast to all
     * downstream operators.
     */
    default void emitRuntimeEvent(RuntimeEvent runtimeEvent) {
      throw new UnsupportedOperationException();
    }
}

NOTE: It is up to the operator implementation to decide when (and how often) to emit RuntimeEvent. The overhead of emitting RuntimeEvent is similar to the overhead of emitting Watermark.

3) Add methods in Input and TwoInputStreamOperator to process RecordAttributes.

Code Block
languagejava
@PublicEvolving
public interface Input<IN> {
    ...

    /**
     * Processes a {@link RecordAttributes} that arrived on this input.
     * This method is guaranteed to not be called concurrently with other methods of the operator.
     */
    default void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {}
}
Code Block
languagejava
@PublicEvolving
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {
    ...

    /**
     * Processes a {@link RecordAttributes} that arrived on the first input of this operator.
     * This method is guaranteed to not be called concurrently with other methods of the operator.
     */
    default void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception {}

    /**
     * Processes a {@link RecordAttributes} that arrived on the second input of this operator.
     * This method is guaranteed to not be called concurrently with other methods of the operator.
     */
    default void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {}
}

NOTE: An operator should take care to enforce the processing latency requirement for the records it has received when isBacklog=false. In particular, the operator should flush/emit buffered records (inside `#processRecordAttributes`) if backlog status of the input switches from false to true, so that we do not trap records that should be processed with low latency.

4) Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that Flink JM can use to properly compile the graph (e.g. whether to add external sorter to sort input records by key).

Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/** The builder class for {@link OperatorAttributes}. */
@PublicEvolving
public class OperatorAttributesBuilder {
    @Nullable private Boolean internalSorterSupported = null; 

    public OperatorAttributesBuilder() {...}

    public OperatorAttributesBuilder setInternalSorterSupported(boolean internalSorterSupported) {...}

    /**
     * If any operator attribute is null, we will log it at DEBUG level and use the following
     * default values.
     * - internalSorterSupported defaults to false
     */
    public OperatorAttributes build() {...}
}

, checkpoint triggering is enabled if and only if all sources have isBacklog=false. (More details for the checkpointing behavior can be found in the doc of execution.checkpointing.interval-during-backlog).

As a result, suppose a source has isBacklog=true, and another source switches from isBacklog=true to isBacklog=false, the job's checkpoint would still be disabled


5) Keyed state backend

Batch mode:

  • BatchExecutionKeyedStateBackend is used. This keyed state backend leverages the fact that the input records are all sorted by key and discards the state of the current key once it sees a record coming in with a different key.

Stream mode:

  • Managed memory is allocated to operators if the keyed state backend is a subclass of AbstractManagedMemoryStateBackend.
  • A general purpose keyed state backend which does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is used.

Mixed mode:

  • Managed memory is allocated to operators with keyed inputs.

  • A general purpose keyed state backend that does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is instantiated by Flink runtime.

  • Before source operator emits isBacklog=false (during backlog processing), the keyed input to one input operator is sorted by the key.

    • Operators with multiple inputs can buffer and sort the inputs internally. We will provide utility classes (derived from the existing ExternalSorter) to sort inputs.
    • Flink runtime will buffer/aggregate state's key/value in memory before persisting state's key/value to the underlying state backend (e.g. rocksdb), such that for the input records that are already sorted on the given key, there will be at most one read/write access to the underlying state backend for each key.
  • At the point when isBacklog switches to false:
    • Source operator emits RecordAttributes(isBacklog=false).
  • The operator continues to process records using the keyed state backend instantiated by Flink runtime, which now contains the full state obtained from records received while isBacklog=true.


6) Timer Service

Batch mode:

  • The timer service keeps timers only for the current key, and it will fire timers only at the end of each key.
  • The value of InternalTimeService#getCurrentWatermark will be Watermark.MIN_VALUE when processing a key and will be set to Watermark.MAX_VALUE when firing triggers. This means that the InternalTimeService#getCurrentWatermark returns Watermark.MIN_VALUE in Input#processElement and Watermark.MAX_VALUE in Triggerable#onEventTime 

Stream mode:

  • The timer service keeps timers for all the keys, and it will fire timers based on the watermark.

Mixed mode:

  • Before source operator emits isBacklog=false, the timer service would fire processing-time timers based on the system time.
  • At the point when isBacklog switches to false, the timer service would fire event-time timers up to the latest watermark at this point.
    • The value of InternalTimeService#getCurrentWatermark will be Watermark.MIN_VALUE when processing a key and will be set to the last watermark during backlog when firing triggers.
  • After isBacklog switches to false, the timer service continues to fire processing-time and even-time timers in the same way as the stream mode.

Public Interfaces

1) Add RecordAttributesBuilder and RecordAttributes that extends StreamElement to provide operator with essential information about the records they receive, such as whether the records are already stale due to backlog.

In the future, we can further enhance this class to provide additional details that aid in optimizing the operator's performance, such as identifying if the records are insert-only.

NOTE: This FLIP focuses on providing a mechanism for sources to propagate the IsProcessingBacklog information (introduced in FLIP-309) from sources to downstream operators and let operators take advantage of this information, without specifying how the source should determine the value of IsProcessingBacklog. In the future, we expect IsProcessingBacklog can very likely be determined using the following strategies:

  • Based on the source operator's state. For example, when MySQL CDC source is reading snapshot, it can claim isBacklog=true.
  • Based on the watermarkLag in the source. For example, when system_time - watermark > user_specified_threshold, then isBacklog=true.
  • Based on metrics. For example, when busyTimeMsPerSecond (or backPressuredTimeMsPerSecond) > user_specified_threshold, then isBacklog=true.


Code Block
languagejava
/** The builder class for {@link RecordAttributes}. */
@Experimental
public class RecordAttributesBuilder {
    @Nullable private Boolean backlog = null;
 
    /**
     * This constructor takes a list of the last recordAttributes received from each
     * of the operator's inputs. When this list is not empty, it will be used to determine
     * the default values for those attributes that have not been explicitly set by caller.
     */
    public RecordAttributesBuilder(List<RecordAttributes> lastRecordAttributesOfInputs) {...}
 
    public RecordAttributesBuilder setBacklog(boolean backlog) {...} 

    /**
     * If any operator attribute is null, we will log it at DEBUG level and determine a non-null
     * default value as described below.
     *
     * Default value for backlog:
     * - if any element in lastRecordAttributesOfInputs has backlog=true, use true.
     * - Otherwise, use false.
     */ 
    public RecordAttributes build() {...}
}


Code Block
languagejava
/**
 * A RecordAttributes element provides stream task with information that can be used to optimize
 * the stream task's performance.
 */
@Experimental
public class RecordAttributes extends StreamElement {
    /**
     * If it returns true, then the records received after this element are stale
     * and an operator can optionally buffer records until isBacklog=false. This
     * allows an operator to optimize throughput at the cost of processing latency.
     */
     @Nullable
     public Boolean isBacklog() {...}
}


2) Add a method in Output to broadcast RecordAttributes to downstream operators.

Code Block
languagejava
@PublicEvolving
public interface Output<T> extends Collector<T> {
    ...      

    /**
     * Emits a {@link RecordAttributes} from an operator. This element is broadcast to all
     * downstream operators.
     */  
	@Experimental     
	default void emitRecordAttributes(RecordAttributes recordAttributes) {
        throw new UnsupportedOperationException();
    } 
}


NOTE: It is up to the operator implementation to decide when (and how often) to emit RecordAttributes. The overhead of emitting RecordAttributes is similar to the overhead of emitting Watermark.


3) Add methods in Input and TwoInputStreamOperator to process RecordAttributes.

Code Block
languagejava
@PublicEvolving
public interface Input<IN> {
    ...
Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize the job performance.
 */
@PublicEvolving
public class OperatorAttributes {     
    /**
     * ReturnsProcesses truea iff{@link theRecordAttributes} operatorthat usesarrived anon internalthis sorterinput.
 to sort inputs by key* whenThis anymethod of
is guaranteed to not be *called theconcurrently followingwith conditionsother aremethods met:
of the operator.
	 * The *
recordAttributes do not need to *be <ul>
persisted in  the  *checkpoint,   <li>execution.runtime-mode = BATCH.
     *   <li>execution.checkpointing.interval-during-backlog = 0 AND any of its input has isBacklog=true.as the source generates
	 * RecordAttributes after recovery from checkpoint.
     */
    @Experimental
    default * <ul>
     *
     * <p>Here are the implications when it is true:void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {}
}


Code Block
languagejava
@PublicEvolving
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {
    ...

     /**
     * <ul>
Processes a {@link RecordAttributes} that *arrived on the <li>Itsfirst input recordsof willthis notoperator.
 to be sorted externally before* being fed into this operator.
     *   <li>Its managed memory will be set according to execution.sorted-inputs.memory.
     * </ul>
     */
    public boolean isInternalSorterSupported() {...}
}

Note that an operator with internal sorter does not necessarily mean that it only emits data at the end of input.  For example, we might have an operator that sorted data when it is still reading from an input with isBacklog=true. When all the inputs (it is still reading from) have isBacklog=false, the operator can stop sorting and start to emit records continuously in the streaming fashion. And the operator can support checkpointing during this period.

5) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

Code Block
languagejava
@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

Proposed Changes

1) Update AbstractStreamOperator and AbstractStreamOperatorV2 to override #processRecordAttributes, #processRecordAttributes1 and #processRecordAttributes2 according to the following logic:

  • If all inputs have isBacklog=false, emits RecordAttributes(isBacklog=false)
  • Otherwise, if all inputs have isBacklog=true, emit RecordAttributes(isBacklog=true)
  • Otherwise, log it at DEBUG level and emit RecordAttributes(isBacklog=true).

This effectively allows an operator to buffer records iff any of its upstream source operators claims isProcessingBacklog=true. 

We will add DEBUG level log to help users locate the two-input operators that do not explicitly override its processRecordAttributes methods.

2) Add the APIs on Transformation interface to get the corresponding operator attributes.

Code Block
languagejava
@Internal
public abstract class Transformation<T> {

    public boolean isInternalSorterSupported() {
        return false;
    }
}

3) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.

4) Update JM to make use of the following operator attributes when compiling the Transformation graph into the JobGraph.

  • If a Transformation has IsInternalSorterSupported == true AND
    (execution.runtime-mode == BATCH OR execution.checkpointing.interval-during-backlog == 0):
    • Flink runtime will not add external sorter for its inputs (including during batch-mode with keyed inputs)
    • Its managed memory should be set according to execution.sorted-inputs.memory

5) For those operators whose throughput can be considerably improved by buffering records, update it to override processRecordAttributes() as appropriate.

We anticipate that operators heavily relying on state backend operations, such as join, co-group, and aggregation operators, can benefit from buffering records. For example, by buffering, sorting and aggregating records in memory before accessing the state backend, we can significantly reduce the average number of state backend accesses required to process a record.

6) For those operators whose throughput can be considerably improved with an internal sorter, update it to take advantage of the internal sorter when its input has isBacklog=true.

Typically, operators that involve aggregation operation (e.g. join, cogroup, aggregate) on keyed inputs can benefit from using an internal sorter.

Though the concrete implementation may vary across operators, an operator typically needs the following changes for this optimization:

  • Override getOperatorAttributes to return IsInternalSorterSupported = true.
  • Override processRecordAttributes() API to adjust its behavior based on the input's isBacklog status.
    For example, the operator can buffer and sort input records when any input has isBacklog=true AND execution.checkpointing.interval-during-backlog=0. Once all inputs' status has switch to isBacklog=false, it processes the buffered records, emits results, and starts to work in the stream execution mode.

Benchmark results

In this section, we provide benchmark results to compare the throughput between batch mode and stream mode for commonly-used operations. We hope this can help us quantify the increase in throughput after we are able to use batch mode to process backlog records.

This method is guaranteed to not be called concurrently with other methods of the operator.
 	 * The recordAttributes do not need to be persisted in the checkpoint, as the source generates
	 * RecordAttributes after recovery from checkpoint. 
     */
    @Experimental
    default void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception {}

    /**
     * Processes a {@link RecordAttributes} that arrived on the second input of this operator.
     * This method is guaranteed to not be called concurrently with other methods of the operator.
 	 * The recordAttributes do not need to be persisted in the checkpoint, as the source generates
	 * RecordAttributes after recovery from checkpoint. 
     */
    @Experimental
    default void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {}
}


4) Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that Flink JM can use to properly compile the graph (e.g. whether to add an external sorter to sort input records by key).


Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/** The builder class for {@link OperatorAttributes}. */
@Experimental
public class OperatorAttributesBuilder {
    @Nullable private Boolean internalSorterSupported = null; 

    public OperatorAttributesBuilder() {...}

    public OperatorAttributesBuilder setInternalSorterSupported(boolean internalSorterSupported) {...}

    /**
     * If any operator attribute is null, we will log it at DEBUG level and use the following
     * default values.
     * - internalSorterSupported defaults to false
     */
    public OperatorAttributes build() {...}
}


Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize the job performance.
 */
@Experimental
public class OperatorAttributes {     
    /**
     * Returns true iff the operator uses an internal sorter to sort inputs by key when any of
     * the following conditions are met:
     *
     * <ul>
     *   <li>execution.runtime-mode = BATCH.
     *   <li>execution.checkpointing.interval-during-backlog = 0 AND any of its input has isBacklog=true.
     * <ul>
     *
     * <p>Here are the implications when it is true:
     *
     * <ul>
     *   <li>Its input records will not to be sorted externally before being fed into this operator.
     *   <li>Its managed memory will be set according to execution.sorted-inputs.memory.
     * </ul>
     */
    public boolean isInternalSorterSupported() {...}
}


Note that an operator with an internal sorter does not necessarily mean that it only emits data at the end of input.  For example, we might have an operator that sorted data when it is still reading from an input with isBacklog=true. When all the inputs (it is still reading from) have isBacklog=false, the operator can stop sorting and start to emit records continuously in the streaming fashion. And the operator can support checkpointing during this period.


5) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

Code Block
languagejava
@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...

    @Experimental
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...

    @Experimental
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}


Proposed Changes

1) Update AbstractStreamOperator and AbstractStreamOperatorV2 to override #processRecordAttributes, #processRecordAttributes1 and #processRecordAttributes2.

These methods should use the following code.

Code Block
languagejava
void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
  return new RecordAttributesBuilder(lastRecordAttributesOfInputs).build();
}


2) Add the APIs on Transformation interface to get the corresponding operator attributes.

Code Block
languagejava
@Internal
public abstract class Transformation<T> {

    public boolean isInternalSorterSupported() {
        return false;
    }
}


3) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.


4) Update the Transformation translator to use the following operator attributes when compiling the Transformation graph into the JobGraph.

  • If a Transformation has IsInternalSorterSupported == true AND
    (execution.runtime-mode == BATCH OR execution.checkpointing.interval-during-backlog == 0):
    • Flink runtime will not add external sorter for its inputs (including during batch-mode with keyed inputs)
    • Its managed memory should be set according to execution.sorted-inputs.memory
  • If a OneInputTransformation has isInternalSorterSupported == false, keyed input, and execution.checkpointing.interval-during-backlog == 0:
    • Flink runtime will add an external sorter for the input to sort the keyed input during backlog processing
    • Its managed memory should be set according to execution.sorted-inputs.memory
    • Flink runtime will buffer/aggregate state's key/value in memory before persisting state's key/value to the underlying state backend (e.g. rocksdb), such that for the input records that are already sorted on the given key, there will be at most one read/write access to the underlying state backend for each key.

With the above change, we can significantly enhance the performance of keyed one input operators without code change when "isBacklog=true". Moreover, the performance of the operator during isBacklog=true is close to the performance in batch mode.


5) For keyed multi-input operators that involve aggregation operation (e.g. join, cogroup, aggregate), update it to take advantage of the sorted input when isBacklog=true.

Currently, there are two keyed multi-input operators in DataStream API, KeyedCoProcessOperator and IntervalJoinOperator. Both of them will be optimized in the following way:

  • Override getOperatorAttributes to return IsInternalSorterSupported = true.
  • Override processRecordAttributes() API to adjust its behavior based on the input's isBacklog status.
    • The operator can buffer and sort input records when any input has isBacklog=true AND execution.checkpointing.interval-during-backlog=0.
    • Once all inputs' status has switched to isBacklog=false, it processes the buffered records, emits results, and starts to work in the stream execution mode.


Analysis of APIs affected by this FLIP


1) After the proposed changes, the following DataStream API should have similar performance as batch mode during backlog processing.


DataStream:

  • DataStream#coGroup


KeyedStream:

  • KeyedStream#process
  • KeyedStream#intervalJoin
  • KeyedStream#reduce
  • KeyedStream#sum
  • KeyedStream#min
  • KeyedStream#max
  • KeyedStream#minBy
  • KeyedStream#maxBy


WindowedStream:

  • WindowedStream#reduce
  • WindowedStream#aggregate
  • WindowedStream#apply
  • WindowedStream#process
  • WindowedStream#sum
  • WindowedStream#min
  • WindowedStream#max
  • WindowedStream#minBy
  • WindowedStream#maxBy


ConnectedStreams:

  • ConnectedStreams#process


2) After the proposed change, the following keyed one input operations (not exhaustive) in SQL API will have a similar performance as batch mode during backlog processing:

  • Window Aggregation
  • Group Aggregation
  • Over Aggregation


For muti-input operations, such as Regular Join, Interval Join, and Temporal Join, can be updated gradually to optimize the performance during backlog processing.


Benchmark results

In this section, we provide benchmark results to compare the throughput between batch mode and stream mode with and without backlog processing optimization for commonly-used operations during backlog processing. This will demonstrate the performance we can gain with the optimization during backlog processing.


1) Use KeyedStream#reduce to process backlog records.

The ReduceBacklogBenchmark benchmark on a macbook with the latest Flink 1.19-snapshot and parallelism=1. RocksDB is used in streaming mode.

Here are the benchmark results:

  • Without the proposed change, in stream mode, the source generates 1*10^7 backlog records, average execution time is 28.8 sec.
  • Without the proposed change, in batch mode, the source generates 1*10^7 backlog records, average execution time is 11.1 sec.
  • With the proposed change, in stream mode, the source generates 1*10^7 backlog records, average execution time is 11.6 sec.

This shows that with the proposed change above, the KeyedStream#reduce can be 2.5X faster than its current throughput in stream mode during backlog processing. And it is very close to the throughput in batch mode, with only 4.3% less throughput.



21) Use DataStream#coGroup to process records from two bounded streams and emit results after both inputs have ended.

...

Compatibility, Deprecation, and Migration Plan

...

  • All the keyed one-input operators can automatically benefit from the optimization during backlog.
  • For multi-input operators, its performance stays the same as before. In order to optimize performance during backlog, users have to update the operator to make use of the isBacklog attribute

...

  • .