Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Discussion thread-
Vote thread-
JIRA

-

Release-

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Let's consider a use-case where we need to aggregate records from a bounded stream using OperatorA in a "batch manner" (i.e., emitting the aggregation result at the end of the input stream), followed by another two-input OperatorB to process both the aggregation result and the records from an unbounded stream in a "stream manner" (i.e., continuously emitting results without waiting for the end of the input streams). Currently, supporting such use-cases requires the use of the STREAMING execution mode, where all operators must be deployed at the start of the job, and operators cannot apply typical "batch-mode" optimizations like sort join.

...

Code Block
languagejava
@Internal
public abstract class Transformation<T> {
    default boolean isBatchForced() {
        return false;
    }
}


Benchmark Results

1) DataStream#CoGroup

We run the CoGroupDataStream benchmark on a mackbook with the latest Flink 1.17-snapshot and parallelism=1. RocksDB is used in the streaming mode.

Here are the benchmark results:

  • Without the change proposed in this FLIP, in stream mode, with each of sources generating 2*10^6 records, average execution time is 56 sec.
  • Without the change proposed in this FLIP, in batch mode, with each of sources generating 5*10^7 records, average execution time is 118 sec.
  • With the change proposed in this FLIP, in both the stream and batch mode, with each of sources generating 5*10^7 records,  average execution time is 100 sec.

This show that the same DataStream program in stream mode can be more than 10X faster with proposed change.


Example Usages



Compatibility, Deprecation, and Migration Plan

...