...
Code Block | ||
---|---|---|
| ||
@PublicEvolving public interface Input<IN> { ... /** * Processes a {@link RecordAttributes} that arrived on this input. * This method is guaranteed to not be called concurrently with other methods of the operator. * The recordAttributes do not need to be persisted in the checkpoint, as the source generates * RecordAttributes after recovery from checkpoint. */ @Experimental default void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {} } |
...
Code Block | ||
---|---|---|
| ||
@PublicEvolving public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> { ... /** * Processes a {@link RecordAttributes} that arrived on the first input of this operator. * This method is guaranteed to not be called concurrently with other methods of the operator. * The recordAttributes */ do not need to @Experimental be persisted in the defaultcheckpoint, voidas processRecordAttributes1(RecordAttributes the source generates * RecordAttributes after recovery from checkpoint. */ @Experimental default void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception {} /** * Processes a {@link RecordAttributes} that arrived on the second input of this operator. * This method is guaranteed to not be called concurrently with other methods of the operator. */ The recordAttributes do @Experimental not need to be defaultpersisted voidin processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {} } |
...
the checkpoint, as the source generates
* RecordAttributes after recovery from checkpoint.
*/
@Experimental
default void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {}
} |
4) Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that Flink JM can use to properly compile the graph (e.g. whether to add an external sorter to sort input records by key).
...
- Override getOperatorAttributes to return IsInternalSorterSupported = true.
- Override processRecordAttributes() API to adjust its behavior based on the input's isBacklog status.
- The operator can buffer and sort input records when any input has isBacklog=true AND execution.checkpointing.interval-during-backlog=0.
- Once all inputs' status has switched to isBacklog=false, it processes the buffered records, emits results, and starts to work in the stream execution mode.
Evaluation
1) After the proposed changes, the following DataStream API should have similar performance as batch mode during backlog processing.
...
ConnectedStreams:
- ConnectedStreams#process
2) After the proposed change, the following keyed one input operations (not exhaustive) in SQL API will have a similar performance as batch mode during backlog processing:
- Window Aggregation
- Group Aggregation
- Over Aggregation
For muti-input operations, such as Regular Join, Interval Join, and Temporal Join, can be updated gradually to optimize the performance during backlog processing.
Benchmark results
In this section, we provide benchmark results to compare the throughput between batch mode and stream mode with and without backlog processing optimization for commonly-used operations during backlog processing. This will demonstrate the performance we can gain with the optimization during backlog processing.
...