Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The bounded/unbounded streams of records can either come from the same source , or come from different sources, as shown in the following examples:

1) We might have a job which that aggregates records from a HybridSource (composed of FileSource and KafkaSource) and emit emits the results to Hudi Sink. When the job is processing records from the FileSource, user needs the job to maximize throughput without having to emit intermediate results with low processing latency. When the job is processing records from the KafkaSource, user needs the job to emit results with low processing latency.

2) As shown in the figure below, we might have a job which that bootstraps its state (operatorA and operatorB) using records from a bounded source (i.e. inputA). There is no need to emit any intermediate result when the job is processing records from the bounded source. After all records from the bounded source have been processed by operatorA and operatorB, the job needs to process records from the unbounded source and emit results with low processing latency in real-time.

...

In this FLIP, we propose to optimize performance for the above use-cases by allowing an operator to effectively switch its execution mode from batch to stream based on the "backlog" status of the records emitted by the source operators.

...

Behavior changes when switching from batch mode to stream mode

In this section, we describe how batch mode and stream mode differ in a variety of aspects. We also describe the new behavior needed for an operator to switch from batch mode to stream mode during execution of the same job.

...

  • All shuffle type will be set to PIPELINED.

Mixed mode with execution.checkpointing.interval-during-backlog > 0:

  • Same as stream mode.

Mixed mode with execution.checkpointing.interval-during-backlog = 0:

  • In mixed mode, the same shuffle strategy as stream mode will be used to set shuffle type between tasks.
  • Keyed inputs of one input operator will

...

  • be automatically sorted

...

  • during isBacklog=true if it doesn't sort the input internally. Multiple inputs operator can sort the input internally.


3) Watermark strategy

Batch mode:

  • Source operator does not emit watermark.

  • Once an operator reaches EOF, the operator triggers all the timers one key at a time.

Stream mode:

  • Source operator does not emit watermark emits watermark based on the user-specified WatermarkStrategy and pipeline.auto-watermark-interval while it has not reached EOF.
  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE for one key at a time after it has emitted all records for that key. That means the downstream operator might receive will not see any further input records after having received watermark=Long.MAX_VALUE for keyA, and then process a record for keyB.

Stream mode:

  • Source operator emits watermark based on the user-specified WatermarkStrategy and pipeline.auto-watermark-interval while it has not reached EOF.
  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records. That means the downstream operator will not see any further input records after having received watermark=Long.MAX_VALUE.

Mixed mode with execution.checkpointing.interval-during-backlog > 0:

  • Same as stream mode.

Mixed mode with execution.checkpointing.interval-during-backlog = 0:

Mixed mode with execution.checkpointing.interval-during-backlog > 0:

  • Same as stream mode.

Mixed mode with execution.checkpointing.interval-during-backlog = 0:

  • Source Source operator does not emit watermark while isBacklog=true.
  • Before source operator emits isBacklog=false, it emits watermark of Long.MAX_VALUE for one key at a time after it has emitted all records for that key.
  • At the point when isBacklog switches to false, source operator emits RecordAttributes

    At the point when isBacklog switches to false, source operator emits RecordAttributes(isBacklog=false).

  • Source operator emits watermark based on the user-specified WatermarkStrategy and pipeline

    Source operator emits watermark based on the user-specified WatermarkStrategy and pipeline.auto-watermark-

    interval while it has not reached EOF.
  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records.

...

  • interval while it has not reached EOF.

  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records.


4) Checkpoint and failover strategy

...

Mixed mode with execution.checkpointing.interval-during-backlog = 0:

  • Managed memory is allocated to operators with keyed inputs

    which claims isInternalSorterSupported == true.

    .

  • A general purpose keyed state backend which does not assume inputs sorted by key 

    A general purpose keyed state backend which does not assume inputs sorted by key

    (e.g.

    EmbeddedRocksDBStateBackend) is instantiated by Flink runtime.

     EmbeddedRocksDBStateBackend) is instantiated by Flink runtime.

  • Before source operator emits isBacklog=false (during backlog processing), the input to the downstream operator is sorted by the key. Operator can leverage the sorted input to optimize the performance

    Before source operator emits isBacklog=false, a downstream operator sorts input records by keys internally without accessing state backend. We will provide utility classes (derived from the existing ExternalSorter) to sort inputs

    .

  • At the point when isBacklog switches to false:
      -
      • Source operator emits
      RecordAttributes(isBacklog=false)
      - Downstream operator will process/aggregate all the records (received before
      • RecordAttributes(isBacklog=
      false)) in order of the sorted keys, and keeps the value of the current key in memory.
      - Downstream operator inserts/updates the (key, value) to the keyed state backend instantiated by Flink runtime once it sees a record coming in with a different key.
      • false)
      • The downstream operator should make sure that the keyed state backend contains the latest value
    • The operator continues to process records using the keyed state backend instantiated by Flink runtime, which now contains the full state obtained from records received while isBacklog=true.

    ...

    Code Block
    languagejava
    /**
     * A RecordAttributes element provides stream task with information that can be used to optimize
     * the stream task's performance.
     */
    @Experimental
    public class RecordAttributes extends RuntimeEvent {
        /**
         * If it returns true, then the records received after this element are stale
         * and an operator can optionally buffer records until isBacklog=false. This
         * allows an operator to optimize throughput at the cost of processing latency.
         */
         @Nullable
         public Boolean isBacklog() {...}
    }

    2) Make RuntimeEvent PublicEvolving and add a method in Output to broadcast RuntimeEvent to downstream operators.

    ...

    languagejava

    ...

     allows an operator to optimize throughput at the cost of processing latency.
         */
         @Nullable
         public Boolean isBacklog() {...}
    }


    2) Add a method in Output to broadcast RuntimeEvent to downstream operators.

    Code Block
    languagejava
    @PublicEvolving
    public interface Output<T> extends Collector<T> {
        ...      
    
        /**		
         * Emits a {@link RuntimeEventRecordAttributes} from an operator. This element is broadcast to all
         * downstream operators.
         */  
    	@Experimental     @Experimental
        	default void emitRuntimeEventemitRecordAttributes(RuntimeEventRecordAttributes runtimeEventrecordAttributes) {
               throwthrow new UnsupportedOperationException();
        } 
    }


    NOTE: It is up to the operator implementation to decide when (and how often) to emit RuntimeEvent. The overhead of emitting RuntimeEvent is similar to the overhead of emitting Watermark.

    ...

    • If a Transformation has IsInternalSorterSupported == true AND
      (execution.runtime-mode == BATCH OR execution.checkpointing.interval-during-backlog == 0):
      • Flink runtime will not add external sorter for its inputs (including during batch-mode with keyed inputs)
      • Its managed memory should be set according to execution.sorted-inputs.memory

    5) For those operators whose throughput can be considerably improved by buffering records, update it to override processRecordAttributes() as appropriate.

    We anticipate that operators heavily relying on state backend operations, such as join, co-group, and aggregation operators, can benefit from buffering records. For example, by buffering, sorting and aggregating records in memory before accessing the state backend, we can significantly reduce the average number of state backend accesses required to process a record.

      • with keyed inputs)
      • Its managed memory should be set according to execution.sorted-inputs.memory


    56) For those operators whose throughput can be considerably improved with an internal sortersorted, update it to take advantage of the internal sorter when its input has sorted input when isBacklog=true.

    Typically, Multiple input operators that involve aggregation operation (e.g. join, cogroup, aggregate) on keyed inputs can benefit from using an internal sorter.

    ...