...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Table of Contents |
---|
Let's consider a use-case where user wants to bootstrap job state (operatorA and operatorB in the figure below) using records from a bounded source. There is no need to emit any records when the job is still processing records from the bounded source. After this bootstrap phase is done (i.e. all records from the bounded source have been processed by operatorA and operatorB), user wants to process records from the unbounded source and emit results continuously in real-time. Additionally, we assume operatorA involves aggregation computation (e.g. co-group) and its throughput in batch mode can be 10X faster than in stream mode (see Benchmark Results below).
...
In this FLIP, we propose to optimize performance for the above use-case by 1) allowing operator to declare its attributes so that JM will only deploy an operator after all its inputs are "un-blocked"; and 2) allowing a given operator to effectively switch its from batch mode to stream mode during execution. We hope these capability can further enhance Flink as a stream-batch unified processing engine.
Public Interfaces
1) Add EndOfStreamWindows which is a subclass of WindowAssigner.
...
Code Block | ||
---|---|---|
| ||
@PublicEvolving public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable { ... default OperatorAttributes getOperatorAttributes() { return new OperatorAttributesBuilder().build(); } } @PublicEvolving public interface StreamOperatorFactory<OUT> extends Serializable { ... default OperatorAttributes getOperatorAttributes() { return new OperatorAttributesBuilder().build(); } } |
Proposed Changes
1) Add the getIsOutputOnEOF() and getHasInternalSorter() to the Transformation interface.
...
4) A blocking input edge with pending records is same as a source with isBacklog=true when an operator determines its RecordAttributes for downstream nodes.
Benchmark Results
1) DataStream#CoGroup
We run the CoGroupDataStream benchmark on a mackbook with the latest Flink 1.17-snapshot and parallelism=1. RocksDB is used in the streaming mode.
...
This show that the same DataStream program in stream mode can be more than 20X faster with proposed change.
Example Usages
Code Block | ||
---|---|---|
| ||
data1.coGroup(data2) .where(tuple -> tuple.f0) .equalTo(tuple -> tuple.f0) .window(EndOfStreamWindows.get()) .apply(new CustomCoGroupFunction()) .addSink(...); |
Compatibility, Deprecation, and Migration Plan
The changes made in this FLIP are backward compatible.
...