Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • If a Transformation has isOutputOnEOF == true:
    • The results of this operator as well as its upstream operators have blocking (by default) or hybrid shuffle partition type which is controlled by configuring ExecutionOptions.BATCH_SHUFFLE_MODE.
    • This operator as well as its upstream operators will be executed in batch mode (e.g checkpoint is disabled when these operators are running).
  • If all Transformation has isOutputOnCheckpoint == false:
    • In FLIP-325, JM will not trigger an extra flush() before triggering a checkpoint.

...

In addition, after FLIP-327 is accepted, this operator should also have isInternalSorterSupported = true. This operator will sort the input before aggregate, and avoid invoking window actions, which is similar to '5)'.

7) Before all of non-pipeline tasks finished, CheckpointCoordinator would get a null checkpoint plan, and it can work normally after all of non-pipeline tasks finish.

8) Hybrid shuffle can be applied by configuring ExecutionOptions.BATCH_SHUFFLE_MODE to determine the result partition type of isOutputOnEOF operator and its upstream operators in RuntimeExecutionMode.STREAMING.

Benchmark results

Environment Configuration

...

Benchmark results

To demonstrate our optimization improvements,we run each benchmark in different execution modes and configurations 5 times. Here are benchmark results which include average throughput (records/ms), throughput range, and the magnification factor for each streaming job before our work.

Environment Configuration

We run benchmarks on a MacBook with the latest Flink 1.18-snapshot and parallelism=1. Default standalone cluster configuration is used except:


Code Block
languageyml
jobmanager.memory.process.size: 6400m
taskmanager.memory.process.size: 6912m
taskmanager.numberOfTaskSlots: 2

Benchmark:

...

CoGroup

Use DataStream#coGroup to process records from two bounded streams with Data Count number of records and emit results after both inputs have ended. With this PR, when DataStream#coGroup is invoked with EndOfStreamWindows as the window assigner, EOFCoGroupOperator will be applied, in which two inputs will be sorted first and then output the coGroup results.

Code Block
languagejava
 source1.keyBy(valuedata1.coGroup(data2)
    .where(tuple -> valuetuple.f0)
  .window(EndOfStreamWindows.get())
  .aggregate(new MyAggregator()).name("Process1")
equalTo(tuple -> tuple.f0)
    .connectwindow(source2EndOfStreamWindows.keyBy(value -> value.f0get())
    .transform("Process2", Types.INT, apply(new MyProcessOperatorCustomCoGroupFunction())
    .addSink(new DiscardingSink<>()); 

Resource Utilization

When we set the Source1Process1, Source2 and Process2 in 4 different SlotSharingGroups as above and then run this job, we can see that, before this PR, all of the tasks couldn't deploy and blocked in CREATED state because only 2 slots provided in TaskManager. With our work, Source2 and Process2 can be deplyed after Source1Process1 finished and released their slots.

OperatorDelayedDeployDataStream

Efficiency Improvement

Without setting the slot sharing group, we run OperatorDelayedDeployDataStreamInOneSlot each benchmark 5 times, and average throughput(record / ms), throughput range and the magnification to the minimum throughput are provided. Here are the benchmark results:

Data CountKey CountCurrent ThroughputOptimized ThroughputHybrid Shuffle Throughput
5e65e4205 ± 4 (149%)1201 ± 59 (876%)1381 ± 14 (1008%)
5e65e5145 ± 21 (105%)1093 ± 61 (797%)1282 ± 41 (935%)
1e71e5186 ± 19 (135%)1270 ± 64 (927%)1423 ± 41 (1038%)
1e71e6137 ± 10 (100%)1176 ± 31 (858%)1313 ± 33 (958%)
5e75e5OOM1463 ± 7 (1067%)1666 ± 35 (1216%)
5e75e6OOM1099 ± 28 (802%)1222 ± 49 (891%)

This shows that, with the changes proposed above, the total processing time can be about 8~10X faster than its current processing time in this scene. As the records number grows, job in current version causes OOM exception while in optimized version it works well. With hybrid shuffle applied in the batch execution part, the blocking edge from Source1 to Process1 changes to hybrid shuffle edge to improve the performance.

Benchmark: CoGroup

...

...);


In this benchmark, we run each job in CoGroupDataStream 5 times in 4 kinds of configuration: streaming mode, batch mode, optimized streaming mode after this PR and optimized streaming mode with hybrid shuffle after this PR. Here are the throughput benchmark results:

Data CountSTREAMINGBATCHOptimized STREAMINGWith hybrid shuffle
2e661 ± 1 (100%)502 ± 75 (823%)868 ± 74 (1423%)899 ± 82 (1474%)
5e660 ± 1 (100%)528 ± 28 (880%)1176 ± 39 (1960%)1262 ± 50 (2103%)
2e756 ± 0 (100%)564 ± 34 (1007%)1492 ± 28 (2664%)1692 ± 14 (3021%)
5e753 ± 1 (100%)602 ± 16 (1135%)1508 ± 18 (2845%)1712 ± 77 (3230%)

This shows with the changes proposed above, DataStream#coGroup can be 14~30X faster than its current throughput in stream mode, and up to 2.5X faster than its current throughput in batch mode. Using hybrid shuffle in the outputEOF part can further improve the performance.

Benchmark: Aggregate

Use DataStream#aggregate to process records from a bounded streams which has Data Count number of records, and every 100 records have the same key. Emit results after input

...

have ended. With this PR, when DataStream#coGroup is invoked with EndOfStreamWindows as the window assigner, EOFAggregationOperator will be applied

...

, in which the input will be sorted first, then aggregate in each key and output results.

Code Block
languagejava
data1.coGroup(data2)
data
      .wherekeyBy(tuplevalue -> tuplevalue.f0)
    .equalTo(tuple -> tuple.f0)
    .window(EndOfStreamWindows.get())
      .applyaggregate(new CustomCoGroupFunctionAggregator())
      .addSink(...(new CountingAndDiscardingSink());

...


In this benchmark, we run each

...

job in

...

AggregationBenchmark 5 times in 4 kinds of configuration: streaming mode, batch mode, optimized streaming mode after this PR and optimized streaming mode with hybrid shuffle after this PR. Here are the throughput benchmark results:

Data Count
Key CountCurrent
STREAMINGBATCHOptimized STREAMING
Hybrid Shuffle2e62e661 ± 1 (115%)502 ± 75 (947%)868 ± 74 (1637%)899 ± 82 (1696%)5e65e660 ± 1 (113%)528 ± 28 (996%)1176 ± 39 (2218%)1262 ± 50 (2381%)2e72e756 ± 0 (105%)564 ± 34 (1064%)1492 ± 28 (2815%)1692 ± 14 (3192%)5e75e7
With hybrid shuffle
5e6201 ± 3 (100%)1454 ± 153 (723%)1514 ± 43 (753%)1670 ± 29 (831%)
1e7207 ± 2 (100%)1522 ± 71 (735%)1620 ± 44 (782%)1866 ± 187 (901%)
2e7201 ± 3 (100%)1607 ± 9 (800%)1746 ± 31 (867%)2038 ± 155 (1014%)
5e7176
53
± 1 (100%)
602
1559 ±
16
6 (
1135%
885%)
1508
1723 ±
18
22 (
2845%
978%)
1712
1956 ±
77
56 (
3230%
1111%)

This shows with the changes proposed above,

...

DataStream#aggregate can be

...

7~10X faster than its current throughput in stream mode, and

...

slightly faster to the current throughput in batch mode. Using hybrid shuffle in the

...

outputEOF part can further improve the performance.

Benchmark:

...

OperatorDelayedDeploy

We have a job that pre-processes records from a bounded source (Source1) using an operator (Process1) which only emits results after its input has ended. Then anther operator(Process2) needs to process records emitted by Process1 with records from an unbounded source, and emit results with low processing latency in real-time

...

.

Code Block
languagejava
data
       source1.keyBy(value -> value.f0)
      .window.window(EndOfStreamWindows.get())
      .aggregate(new AggregatorMyAggregator()).name("Process1")
  .connect(source2.keyBy(value -> value.f0))
  .transform("Process2", Types.INT, new MyProcessOperator())
  .addSink(new CountingAndDiscardingSinkDiscardingSink<>());

We run benchmark in 10 records per key and 100 records per key in AggregationBenchmark, and each for 5 times, here are the throughput benchmark results:

10 records per key

Data CountKey CountCurrentBATCHOptimizedHybrid Shuffle
5e65e5164 ± 1 (145%)1277 ± 129 (1130%)1365 ± 79 (1207%)1418 ± 154 (1254%)
1e71e6151 ± 2 (133%)1403 ± 87 (1241%)1511 ± 43 (1337%)1697 ± 175 (1501%)
2e72e6129 ± 4 (114%)1322 ± 21 (1169%)1411 ± 17 (1248%)1598 ± 13 (1414%)
5e75e6113 ± 1 (100%)1282 ± 13 (1134%)1155 ± 13 (1022%)1256 ± 22 (1111%)

100 records per key

Data CountKey CountCurrentBATCHOptimizedHybrid Shuffle
5e65e4201 ± 3 (114%)1454 ± 153 (826%)1514 ± 43 (860%)1670 ± 29 (948%)
1e71e5207 ± 2 (117%)1522 ± 71 (864%)1620 ± 44 (920%)1866 ± 187 (1060%)
2e72e5201 ± 3 (114%)1607 ± 9 (913%)1746 ± 31 (992%)2038 ± 155 (1157%)
5e75e5176 ± 1 (100%)1559 ± 6 (885%)1723 ± 22 (978%)1956 ± 56 (1111%)

...

 

Resource Utilization

Suppose we set an extra standalone cluster configuration:

Code Block
languageyml
taskmanager.numberOfTaskSlots: 2

When we set the Source1Process1, Source2 and Process2 in 4 different SlotSharingGroups as above and then run this job, we could see that, before this PR, all of the tasks couldn't deploy and blocked in CREATED state because only 2 slots provided in TaskManager. With our work, Source2 and Process2 can be deplyed after Source1Process1 finished and released their slots.

OperatorDelayedDeployDataStream

Efficiency Improvement

Without setting the slot sharing group, we run OperatorDelayedDeployDataStreamInOneSlot each benchmark 5 times. Here are the throughput benchmark results:

Data CountSTREAMINGOptimized STREAMINGWith hybrid shuffle
5e6205 ± 4 (100%)1201 ± 59 (586%)1381 ± 14 (674%)
1e7186 ± 19 (100%)1270 ± 64 (683%)1423 ± 41 (765%)
5e7OOM1463 ± 71666 ± 35

This shows that, as the records number grows, job in streaming mode causes OOM exception while it works well with the changes proposed above.

Compatibility, Deprecation, and Migration Plan

...