Status
Current state: Released
Discussion thread: https://lists.apache.org/thread.html/r11750db945277d944f408eaebbbdc9d595d587fcfb67b015c716404e%40%3Cdev.flink.apache.org%3E
...
Several new config options will be added to control the behavior of the sort-merge based blocking shuffle and by disable sort-merge based blocking shuffle by default, the default behavior of blocking shuffle stays unchanged.
Config Option | Description |
---|---|
taskmanager.network.sort-shuffle. maxmin-buffers | Maximum Minimum number of required network buffers can be used per buffers required per sort-merge blocking result partition. |
taskmanager.network.sort-shuffle. enabledWhether to use sort-merge based blocking shuffle or notmin-parallelism | Parallelism threshold to switch between sort-merge based blocking shuffle and the default hash-based blocking shuffle. |
taskmanager.memory.framework.off-heap.batch-shuffle.size | Size of direct memory used by blocking shuffle for shuffle data read. |
A fixed number of network buffers per result partition makes the memory consumption decoupled with parallelism which is more friendly for large scale batch jobs.
...
If there are multiple disks, load balance is important for good performance. The simplest way to achieve load balance is rebalance disk selection.
Restrict Concurrent Partition Requests (Not implemented
...
in FLIP)
For large scale batch jobs, a large number of network connections will be established, which may incur stability issues. We can restrict the number of concurrent partition requests to relieve the issue. Besides, restricting concurrent partition requests can increase the number of network buffers can be used per remote channel, that is, more credits per channel which is helpful for the shuffle reader to read sequentially. (As we mentioned above, the number of available credits can influence sequential read because we can not read more buffers than the consumer can process)
Implement External/Remote Shuffle Service (Not implemented
...
in FLIP)
Implementing a stand-alone shuffle service can further improve the shuffle IO performance because it is a centralized service and can collect more information which can lead to more optimized actions. For example, better node-level load balance, better disk-level load balance, further file merging, node-level IO scheduling and shared read/write buffer and thread pool. It can be introduced in a separated FLIP.
Implementation and Test Plan
Step #1: Implement Basic Shuffle Logic
...
and Data Compression
...
Basic shuffle logic and data compression will be implemented first, which can make the sort-merge based blocking shuffle available for usage. Main components include 1) SortBuffer and a hash-based data clustering implementation; 2) PartitionedFile together with the corresponding writer (PartitionedFileWriter) and reader (PartitionedFileReader); 3) SortMergeResultPartition and the subpartition data reader SortMergePartitionReader. We will introduce this components separately. For data compression, by reusing the facilities implemented for the existing BoundedBlockingResultPartition, only very small change is needed. Tests will include both unit tests, IT cases and real job test on a cluster. Besides, IO scheduling can improve the shuffle performance a lot so we also need to implement it.
Step #2:
...
Implement IO Scheduling Other Optimizations
File merge and other optimizations can be implemented as the second step. Main components include MergePolicy, PartitionedFileMerge, IOScheduler and PartitionRequestManager. Tests will include both unit tests, IT cases and real job test on a cluster.
...