Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current stateReleased

Discussion threadhttps://lists.apache.org/thread.html/r11750db945277d944f408eaebbbdc9d595d587fcfb67b015c716404e%40%3Cdev.flink.apache.org%3E

...

Several new config options will be added to control the behavior of the sort-merge based blocking shuffle and by disable sort-merge based blocking shuffle by default, the default behavior of blocking shuffle stays unchanged.

Config OptionDescription
taskmanager.network.sort-shuffle.
max
min-buffers
Maximum
Minimum number of
required
 network
buffers can be used per
 buffers required per sort-merge blocking result partition.
taskmanager.network.sort-shuffle.
enabledWhether to use sort-merge based blocking shuffle or not
min-parallelism
Parallelism threshold to switch between sort-merge based blocking shuffle and the default hash-based blocking shuffle.
taskmanager.memory.framework.off-heap.batch-shuffle.size
Size of direct memory used by blocking shuffle for shuffle data read.

A fixed number of network buffers per result partition makes the memory consumption decoupled with parallelism which is more friendly for large scale batch jobs.

...

If there are multiple disks, load balance is important for good performance. The simplest way to achieve load balance is rebalance disk selection.

...

For large scale batch jobs, a large number of network connections will be established, which may incur stability issues. We can restrict the number of concurrent partition requests to relieve the issue. Besides, restricting concurrent partition requests can increase the number of network buffers can be used per remote channel, that is, more credits per channel which is helpful for the shuffle reader to read sequentially. (As we mentioned above, the number of available credits can influence sequential read because we can not read more buffers than the consumer can process)

...

Implementing a stand-alone shuffle service can further improve the shuffle IO performance because it is a centralized service and can collect more information which can lead to more optimized actions. For example, better node-level load balance, better disk-level load balance, further file merging, node-level IO scheduling and shared read/write buffer and thread pool. It can be introduced in a separated FLIP.

Implementation and Test Plan

...

...

...

File merge and other optimizations can be implemented as the second step. Main components include MergePolicy, PartitionedFileMerge, IOScheduler and PartitionRequestManager. Tests will include both unit tests, IT cases and real job test on a cluster.

...