Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In addition, after FLIP-327 is accepted, this operator suppose this API is called without any explicitly-assigned trigger or evictor, the program should also have isInternalSorterSupported = true to achieve higher throughput by using the optimizations currently done in batch mode.


The following optimization will be used to achieve This operator will use the following optimization to achieve much higher throughput than the existing DataStream#coGroup in both stream and batch mode:

...

In addition, after FLIP-327 is accepted, this operator suppose this API is called without any explicitly-assigned trigger or evictor, the program should also have isInternalSorterSupported = true to achieve higher throughput by using the optimizations currently done in batch mode.

More specifically, . This operator will sort the input before aggregateaggregation, and avoid invoking window actions, which is similar to '5)'.

Benchmark results

Generally, when operator(coGroup or aggregate) is invoked optimized with EndOfStreamWindows as the window assigner and trigger and evictor are not assigned, the optimized operator EOFCoGroupOperator / EOFAggregationOperator will be applied, in which case the input(s) will be sorted first and then output the results after the input end of input. This makes the performance with our work can be mutiple times faster than before and slightly faster than batch mode because of avoiding window related work.Using hybrid shuffle in the outputEOF part can make part of operator pipeline consume records from upstream to further improve the performance.

To demonstrate our optimization improvements,we run each benchmark in different execution modes and configurations 5 times. Here are benchmark results which include average throughput (records/ms), throughput range, the magnification factor for each streaming job before our work and duration time(ms).

...

To demonstrate our optimization improvements,we run each benchmark in different execution modes and configurations. Each result is measured by taking the average execution time across 5 runs with the given configuration.

We run benchmarks on a MacBook with the latest Flink 1.18-snapshot and parallelism=1. Default standalone cluster configuration is used except:

...

Code Block
languageyml
jobmanager.memory.process.size: 6400m
taskmanager.memory.process.size: 6912m

Benchmark: CoGroup


1) Execute DataStream#CoGroup

This benchmark uses Use DataStream#coGroup to process records from two bounded inputs both with . Each input has 8*10^7 records with unique key for each record.

Below is the DataStream program code snippetData Count number of records distributed in Data Count number of keys. CoGroup results will be emitted after input have ended.

Code Block
languagejava
data1.coGroup(data2)
    .where(tuple -> tuple.f0)
    .equalTo(tuple -> tuple.f0)
    .window(EndOfStreamWindows.get())
    .apply(new CustomCoGroupFunction())
    .addSink(...);

...

..);


The following result shows the throughput (records/sec) when the benchmark is executed in streaming mode, batch mode, optimized streaming mode after this PR, and optimized streaming mode with hybrid shuffle after this PR

...

.

The result shows that DataStream#coGroup in optimized streaming mode can be 22X as fast as streaming mode and 3X as fast as batch mode. Hybrid shuffle can further improve throughput by 11%.

...

STREAMINGBATCHOptimized STREAMINGWith hybrid shuffle
8e766 ± 1 (100%, 1202426 ms)491 ± 5 (743%, 162731 ms)1506 ± 10 (2281%, 53098 ms)1677 ± 42 (2540%, 47687 ms)

This shows with the changes proposed above, DataStream#coGroup in Optimized STREAMING can be 20X faster than STREAMING throughput and 3X faster than BATCH throughput in batch mode. Using hybrid shuffle in the outputEOF part can further improve the performance about 10%.

Benchmark: Aggregate

, 53098 ms)1677 ± 42 (2540%, 47687 ms)


2) Execute DataStream#Aggregate

This benchmark uses DataStream#aggregate to process 8*10^7 records. These records are evenly distributed across 8*10^5 keys, i.e. each unique key has 100 records.

Below is the DataStream program code snippetUse DataStream#aggregate to process records from a bounded streams which has Data Count number of records, and every 100 records have the same key. Aggregate results will be emitted after input have ended.

Code Block
languagejava
data
      .keyBy(value -> value.f0)
      .window(EndOfStreamWindows.get())
      .aggregate(new Aggregator())
      .addSink(new CountingAndDiscardingSink());

...


The following result shows the throughput (records/sec) when the benchmark is executed in streaming mode, batch mode, optimized streaming mode after this PR, and optimized streaming mode with hybrid shuffle after this PR

...

.

The result shows that DataStream#aggregate in optimized streaming mode can be 10X as fast as streaming mode and 11% faster than batch mode. Hybrid shuffle can further improve throughput by 15%.

...

8e7
Data CountSTREAMINGBATCHOptimized STREAMINGWith hybrid shuffle
163 ± 0 (100%, 490478 ms)1561 ± 16 (957%, 51237 ms)1733 ± 9 (1063%, 46143 ms)1992 ± 15 (1222%, 40148 ms)

This shows with the changes proposed above, DataStream#aggregate in Optimized STREAMING can be 10X faster than STREAMING throughput and slightly faster to the BATCH. Using hybrid shuffle in the outputEOF part can further improve the performance about 15%.

Benchmark: OperatorDelayedDeploy

...


3) Execute a program that needs to fully process data from a bounded source before processing data from another unbounded source.

The following program demonstrates the scenario described in the motivation section. The program needs to pre-processes records from a bounded source (Source1) using an operator (Process1) which only emits results after its input has ended. Then anther operator(Process2) needs to process records emitted by Process1 with records from an unbounded source, and emit results with low processing latency in real-time.

Code Block
languagejava
 source1.keyBy(value -> value.f0)
  .window(EndOfStreamWindows.get())
  .aggregate(new MyAggregator()).name("Process1")
  .connect(source2.keyBy(value -> value.f0))
  .transform("Process2", Types.INT, new MyProcessOperator())
  .addSink(new DiscardingSink<>()); 

Resource Utilization

Suppose we set an extra standalone cluster configuration:

Code Block
languageyml
taskmanager.numberOfTaskSlots: 2

...


We can use this program to demonstrate that the program requires less slot resources. More specifically, suppose we configure the standalone cluster with taskmanager.numberOfTaskSlots = 2, and set the Source1,Process1, Source2 and Process2 in 4 different SlotSharingGroups

...

, the program will fail to be deployed before this FLIP. And the program can be deployed successfully after this FLIP. This is because Source2 and Process2 can be deplyed after Source1Process1 finished and released their slots.

Additionally, we can use this program to demonstrate that it can achieve higher performance because Process2 will not need to keep buffer records emitted by Source2 in is memory while Process1 has not reached EOF. More specifically, the program can fail with OOM before this FLIP when the number of records in inputs is high. And the program can finish successfully without OOM after this FLIP

...

.

Compatibility, Deprecation, and Migration Plan

...