Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

1) We might have a job which aggregates records from a HybridSource (composed of FileSource and KafkaSource) and emit the results to Hudi Sink. When the job is processing records from the FileSource, user needs the job to maximize throughput without having to emit intermediate results with low processing latency. And when When the job is processing records from the KafkaSource, user needs the job to emit results with low processing latency.

2) As shown in the figure below, we might have a job which bootstraps its state (operatorA and operatorB) using records from a bounded source (i.e. inputA). There is no need to emit any intermediate results result when the job is processing records from the bounded source. After all records from the bounded source have been processed by operatorA and operatorB, the job needs to process records from the unbounded source and emit results with low processing latency in real-time.

Currently, supporting the above use-cases requires all operators to be deployed at the start of the job and run in the stream mode across the entire job lifetime. This approach leads to inferior performance due to these drawbacks: 1) an operator because some operators (e.g. co-group) might be more than 10X slower in stream mode than when it is run in batch mode; 2) it is a waste of slot resource to deploy an operator (e.g. operatorB) when its input (e.g. operatorA's results) is not available; and 3) an operator that is deployed earlier than necessary (e.g. operatorB) might end up spilling a lot of records from the unbounded source to its local disk until it has received all records from operatorA.

In this FLIP, we propose to optimize performance for the above use-cases by allowing an operator to effectively switch its execution mode from batch to stream based on "backlog" status of the records emitted by the source operators. We expect to further enhance Flink as a stream-batch unified processing engine via this FLIP.


NOTE: this FLIP depends on the APIs (e.g. RecordAttributes with isBacklog information) proposed in FLIP-325. While FLIP-325 allows an operator to buffer records for arbitrary long while its input records have isBacklog=true, this FLIP additionally allows an operator to use optimizations that is currently only applicable in the batch execution mode (e.g. using ExternalSorter which does not support checkpoint).

...

In section, we describe how batch mode and stream mode differ in a variety of aspects. We also describe the new behavior needed for an operator to swtich switch from batch mode to stream mode during execution of the same job.

...

The capability to update/optimize scheduling (e.g. speculative execution, AQE) for in mixed mode use-cases will be left to future work.

...

  • BatchExecutionKeyedStateBackend is used. This keyed state backend leverages the fact that the input records are all sorted by key and discards the state of the current key once it sees a record coming in with a different key.

Stream mode:

  • Managed memory is allocated to operators if the keyed state backend is a subclass of AbstractManagedMemoryStateBackend.
  • A general purpose keyed state backend which does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is used.

...

  • Managed memory is allocated to operators with keyed inputs which claims isInternalSorterSupported == true.
  • A general purpose keyed state backend which does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is used instantiated by Flink runtime.
  • Before source operator emits isBacklog=false, a downstream operator sorts input records by keys internally without accessing state backend. We will provide utility classes (derived from the existing ExternalSorter) to sort inputs.
  • At the point when isBacklog switches to false:
    - Source operator emits RecordAttributes(isBacklog=false)
    - Downstream operator will process/aggregate all the records (received before RecordAttributes(isBacklog=false)) in order of the sorted keys, using a BatchExecutionKeyedStateBackend which it has explicitly instantiated internally.
    - Downstream operator moves the each (key, state) pair from BatchExecutionKeyedStateBackend to its general purpose the keyed state backend instantiated by Flink runtime once it sees a record coming in with a different key.
  • The operator continues to process records using the general purpose keyed state backend instantiated by Flink runtime, which now contains the full state obtained from records received while isBacklog=true.

...

Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize the job performance.
 */
@PublicEvolving
public class OperatorAttributes {     
    /**
     * Returns true iff the operator uses an internal sorter whento checkpointsort isinputs disabled.
by key when any  * of
     * <p>Checkpointthe isfollowing disabled disabled in any of the following casesconditions are met:
     *
     * <ul>
     *   <li>execution.runtime-mode = BATCH.
     *   <li>execution.checkpointing.interval-during-backlog = 0 AND any of its input has isBacklog=true.
     * <ul>
     *
     * <p>Here are the implications when it is true:
     *
     * <ul>
     *   <li>Its input records will not to be sorted externally before being fed into this operator.
     *   <li>Its managed memory shouldwill be set according to execution.sorted-inputs.memory.
     * </ul>
     */
    public boolean isInternalSorterSupported() {...}
}

...

  • If a Transformation has IsInternalSorterSupported == true AND
    (execution.runtime-mode == BATCH OR execution.checkpointing.interval-during-backlog == 0):
    • Flink runtime will not add external sorter for its inputs (including during batch-mode with keyed inputs)
    • Its managed memory should be set according to execution.sorted-inputs.memory


4) A blocking input edge with pending records is same as a source with isBacklog=true when an operator determines its RecordAttributes for downstream nodes.5) For those operators whose throughput in stream mode is observably slower than its throughput in batch modecan be considerably improved with an internal sorter, update it to take advantage of the internal sorter when its input has isBacklog=true.

Typically, operations operators that involve aggregation operation (e.g. join, cogroup, aggregate) on keyed inputs can benefit from using an internal sorter.

...

  • Override getOperatorAttributes to return IsInternalSorterSupported = true.
  • Override processRecordAttributes() API (see FLIP-325) to adjust its behavior based on the input's isBacklog status.
    For example, the operator can buffer and sort input records when any input has isBacklog=true AND execution.checkpointing.interval-during-backlog=0. And once Once all inputs' status has switch to isBacklog=false, it processes the buffered records, emits results, and starts to work in the stream execution mode.

...