Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Let's consider a use-case where user wants to bootstrap job state (operatorA and operatorB in the figure below) using records from a bounded source. There is no need to emit any records when the job is still processing records from the bounded source. After this bootstrap phase is done (i.e. all records from the bounded source have been processed by operatorA and operatorB), user wants to process records from the unbounded source and emit results continuously in real-time. Additionally, we assume operatorA involves aggregation computation (e.g. co-group) and its throughput in batch mode can be 10X faster than in stream mode (see Benchmark Results below).

...

In this FLIP, we propose to optimize performance for the above use-case by 1) allowing operator to declare its attributes (e.g. isOutputOnEOF) so that JM will only deploy an operator after all its inputs are "un-blocked"deploy its downstream operators only after this operator finishes; and 2) allowing a given operator to effectively switch its from batch mode to stream mode during execution. We hope these capability can further enhance Flink as a stream-batch unified processing engine.



Background

An operator is a batch-only operator when it meets the following properties:

  • The operator should emit records only after all its inputs have ended. There is no requirement on processing latency.
  • The operator does not need to support checkpointing. Thus operator can use arbitrary optimization (e.g. sorting).

An operator is a stream-only operator when it meets the following properties:

  • The operator should emit records continuously while it is still processing input records. Processing latency needs to be low.
  • The operator should support checkpointing. Thus operator should only use optimizations compatible with checkpoint.

Based on the definitions specified above, we can see that if a use-case do not need low processing latency, then it should use batch-mode operators to maximize throughput. Otherwise, it should use stream-mode operators to achieve low processing latency.


However, for use-case that need low processing latency only after some backlog data has been processed, such as the one described in the motivation section, neither the stream-only operator nor the batch-only operator can deliver the optimal performance. Therefore, we define stream-batch unified operator as described below:


An operator is a stream-only operator when it meets the following properties:

  • The operator can extract and handle isBacklog (a boolean value) from its inputs.
  • While any of its inputs have isBacklog=true:
    • The operator should not emit records. There is no requirement on processing latency during this stage.
    • The operator does not need to support checkpointing. Thus operator can use arbitrary optimization (e.g. sorting) in this stage.
  • While all of its inputs have isBacklog=false:
    • The operator should emit records continuously while it is still processing input records. Processing latency needs to be low.
    • The operator should support checkpointing. Thus operator should only use optimizations compatible with checkpoint.


Public Interfaces

1) Add EndOfStreamWindows which is a subclass of WindowAssigner.

...

Code Block
languagejava
@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}


Proposed Changes

1) Add the getIsOutputOnEOF() and getHasInternalSorter() to the Transformation interface.

...

4) A blocking input edge with pending records is same as a source with isBacklog=true when an operator determines its RecordAttributes for downstream nodes.



Benchmark Results

1) DataStream#CoGroup

We run the CoGroupDataStream benchmark on a mackbook with the latest Flink 1.17-snapshot and parallelism=1. RocksDB is used in the streaming mode.

...

This show that the same DataStream program in stream mode can be more than 20X faster with proposed change.

Example Usages

Code Block
languagejava
data1.coGroup(data2)
     .where(tuple -> tuple.f0)
     .equalTo(tuple -> tuple.f0)
     .window(EndOfStreamWindows.get())
     .apply(new CustomCoGroupFunction())
     .addSink(...);


Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.

...