Status
Current state: Accepted
Discussion thread: https://lists.apache.org/thread/qwrh22do8scghz79vy852pqx2ny4jqv6
Vote thread: https://lists.apache.org/thread/gdymm7pr2slzy9gqkfo97vn73496w0cj
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Released: 1.17
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Currently, users have to set some configuration of the adaptive batch scheduler, which is not very convenient. To use the adaptive batch scheduler as the default batch scheduler, we need to improve the user's out-of-the-box experience. Therefore, we also need to optimize the current adaptive batch scheduler configuration.
Note that DataSet API will not support use the AdaptiveBatchScheduler as default scheduler(See Limitations for more details).
Public Interfaces
Introduce a new adaptive batch scheduler configuration parameters:
...
- Modify the default value of jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task from 1024 Mb to 16 Mb.
- Use parallelism.default as the default value of jobmanager.adaptive-batch-scheduler.max-parallelism.
- Modify the default value of ExecutionConfig#ExecutionMode from ExecutionMode.PIPELINED to ExecutionMode.BATCH-FORCED.
Rename the configuration of adaptive batch scheduler:
...
- If the config option `jobmanager.scheduler` is configured, then Flink uses the configured one.
- If the config option `jobmanager.scheduler` is not configured, then
- If the job type is batch and DataStream, then choose `Adaptive Batch Scheduler`
- If the job type is stream, then choose `Default Scheduler`
At present, the default execution mode of DataSet jobs is ExecutionMode.PIPELINED. However, the adaptive batch scheduler currently only supports blocking edges, so we plan to modify the default value of the ExecutionConfig#ExecutionMode from ExecutionMode.PIPELIEND to ExecutionMode.BATCH-FORCED.
...
Key
...
Current Value
...
Target Value
...
ExecutionConfig#ExecutionMode
...
ExecutionMode.PIPELINED
...
ExecutionMode.BATCH-FORCED
Simplify the configuration of adaptive batch scheduler
...
Key | Current Default Value | New Default Value |
execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task | MemorySize.ofMebiBytes(1024) | MemorySize.ofMebiBytes(16) |
Limitations
Not support AdaptiveBatchScheduler as default scheduler for DataSet jobs
There are several reasons for this limitation:
- DataSet API will be deprecated in the future and DataStream API is now the recommended low level API to develop Flink batch jobs.
- AdaptiveBatchScheduler does not support pipeline edges, but the DataSet API has some hard code to use pipeline edge. So that it can not control by configuration like configuration "execution.batch-shuffle-mode" which is used in DataStream API.
- It needs a lot of effort to determine whether the parallelism of the DataSet operator is set by the user.
Compatibility, Deprecation, and Migration Plan
- The default batch scheduler will be changed to AdaptiveBatchScheduler. The adaptive batch scheduler does not support pipeline edges, so it only supports execution.batch-shuffle-mode as ALL_EXCHANGES_BLOCKING (default), ALL_EXCHANGES_HYBRID_ FULL and ALL_EXCHANGES_HYBRID_SELECTIVE. If users want to use pipeline edges, just add this config option: jobmanager.scheduler: default, so that DefaultScheduler will be used.The adaptive batch scheduler does not support pipeline edges, the default executionMode of the DataSet is changed from PIPELINED to BATCH_ FORCED. This will change pipelined edges to blocking edges, which may cause performance degradation. However, currently pipelined shuffle can hardly be used in production for batch jobs, unless there are plenty of resources. Therefore, we believe that it is better to use the more powerful adaptive batch scheduler by default than to let DataSet jobs continue to use the DefaultScheduler. If users want to go back to the state before introducing the feature, just add this code in DataSet job: env.getConfig().setExecutionMode(ExecutionMode.PIPELINED) and add this config option: jobmanager.scheduler: default, so that DefaultScheduler will be used.
Test Plan
The proposed changes will be tested for correctness and performance through the TPC-DS benchmark suite in a real cluster.
...