Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • If a Transformation has isOutputOnEOF == true:
    • The process of operator chain can still be done. After that, the results of its operator chain as well as and its upstream operators have will be set blocking (by default) or hybrid shuffle partition type which is can be controlled by configuring ExecutionOptions.BATCH_SHUFFLE_MODE.
    • This operator as well as its upstream operators will be executed in batch mode (e.g checkpoint is disabled when these operators are running).
  • If all Transformation has isOutputOnCheckpoint == false:
    • In FLIP-325, JM will not trigger an extra flush() before triggering a checkpoint.

...

In this benchmark, we run each job in CoGroupDataStream 5 times in 4 kinds of configuration: streaming mode, batch mode, optimized streaming mode after this PR and optimized streaming mode with hybrid shuffle after this PR. Here are the throughput benchmark results:

Data CountSTREAMINGBATCHOptimized STREAMINGWith hybrid shuffle
8e766 ± 1 (100%, 1202426 ms)491 ± 5 (743%, 162731 ms)1506 ± 10 (2281%, 53098 ms)1677 ± 42 (2540%, 47687 ms)

This shows with the changes proposed above, DataStream#coGroup in Optimized STREAMING can be 20X faster than STREAMING throughput and 3X faster than BATCH throughput in batch mode. Using hybrid shuffle in the outputEOF part can further improve the performance about 10%.

Benchmark: Aggregate

Use DataStream#aggregate to process records from a bounded streams which has Data Count number of records, and every 100 records have the same key. Aggregate results will be emitted after input have ended.

...

In this benchmark, we run each job in AggregationBenchmark 5 times in 4 kinds of configuration: streaming mode, batch mode, optimized streaming mode after this PR and optimized streaming mode with hybrid shuffle after this PR. Here are the throughput benchmark results:

Data CountSTREAMINGBATCHOptimized STREAMINGWith hybrid shuffle
8e7163 ± 0 (100%, 490478 ms)1561 ± 16 (957%, 51237 ms)1733 ± 9 (1063%, 46143 ms)1992 ± 15 (1222%, 40148 ms)

This shows with the changes proposed above, DataStream#aggregate in Optimized STREAMING can be 10X faster than STREAMING throughput and slightly faster to the BATCH. Using hybrid shuffle in the outputEOF part can further improve the performance about 15%.

Benchmark: OperatorDelayedDeploy

...

Code Block
languageyml
taskmanager.numberOfTaskSlots: 2

When we set the Source1Process1, Source2 and Process2 in 4 different SlotSharingGroups as above and then run this job, we could see that, before this PR, all of the tasks couldn't deploy and blocked in CREATED state because only 2 slots provided in TaskManager. With our work, Source2 and Process2 can be deplyed after Source1Process1 finished and released their slots.OperatorDelayedDeployDataStream

Without setting the slot sharing group, we run OperatorDelayedDeployDataStreamInOneSlot job in STREAMING, Optimized STREAMING and Optimized STREAMING With hybrid shuffle. As the job records number grows, throughput in STREAMING gets decreased because of the buffer presure in Process2, and even causes OOM exception while it works well in Optimized STREAMING. Using hybrid shuffle can make Process1 to pipeline consume records from Source1 to further improve the performance.

Compatibility, Deprecation, and Migration Plan

...