Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
@PublicEvolving
public interface Input<IN> {
    ...

    /**
     * Processes a {@link RecordAttributes} that arrived on this input.
     * This method is guaranteed to not be called concurrently with other methods of the operator.
	 * The recordAttributes do not need to be persisted in the checkpoint, as the source generates
	 * RecordAttributes after recovery from checkpoint.
     */
    @Experimental
    default void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {}
}

...

Code Block
languagejava
@PublicEvolving
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {
    ...

    /**
     * Processes a {@link RecordAttributes} that arrived on the first input of this operator.
     * This method is guaranteed to not be called concurrently with other methods of the operator.
 	 * The recordAttributes */
do not need to @Experimental
be persisted in the defaultcheckpoint, voidas processRecordAttributes1(RecordAttributes the source generates
	 * RecordAttributes after recovery from checkpoint. 
     */
    @Experimental
    default void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception {}

    /**
     * Processes a {@link RecordAttributes} that arrived on the second input of this operator.
     * This method is guaranteed to not be called concurrently with other methods of the operator.
 	    */
 The recordAttributes do @Experimental
not need to be defaultpersisted voidin processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {}
}

...

the checkpoint, as the source generates
	 * RecordAttributes after recovery from checkpoint. 
     */
    @Experimental
    default void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {}
}


4) Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that Flink JM can use to properly compile the graph (e.g. whether to add an external sorter to sort input records by key).

...

  • Override getOperatorAttributes to return IsInternalSorterSupported = true.
  • Override processRecordAttributes() API to adjust its behavior based on the input's isBacklog status.
    • The operator can buffer and sort input records when any input has isBacklog=true AND execution.checkpointing.interval-during-backlog=0.
    • Once all inputs' status has switched to isBacklog=false, it processes the buffered records, emits results, and starts to work in the stream execution mode.


Evaluation


1) After the proposed changes, the following DataStream API should have similar performance as batch mode during backlog processing.

...

ConnectedStreams:

  • ConnectedStreams#process


2) After the proposed change, the following keyed one input operations (not exhaustive) in SQL API will have a similar performance as batch mode during backlog processing:

  • Window Aggregation
  • Group Aggregation
  • Over Aggregation


For muti-input operations, such as Regular Join, Interval Join, and Temporal Join, can be updated gradually to optimize the performance during backlog processing.


Benchmark results

In this section, we provide benchmark results to compare the throughput between batch mode and stream mode with and without backlog processing optimization for commonly-used operations during backlog processing. This will demonstrate the performance we can gain with the optimization during backlog processing.

...