Status

Current state: Accepted

Discussion thread: https://lists.apache.org/thread/qwrh22do8scghz79vy852pqx2ny4jqv6

Vote thread: https://lists.apache.org/thread/gdymm7pr2slzy9gqkfo97vn73496w0cj

JIRA: Unable to render Jira issues macro, execution error.

Released: 1.17

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In FLIP-187, we introduced an adaptive batch scheduler. The adaptive batch scheduler has stronger batch scheduling capabilities, including automatically deciding parallelisms of job vertices for batch jobs (FLIP-187), data balanced distribution (FLINK-29663), and speculative execution (FLIP-168). To further use the adaptive batch scheduler to improve flink's batch capability, in this FLIP we aim to make the adaptive batch scheduler as the default batch scheduler. 

Currently, users have to set some configuration of the adaptive batch scheduler, which is not very convenient. To use the adaptive batch scheduler as the default batch scheduler, we need to improve the user's out-of-the-box experience. Therefore,  we also need to optimize the current adaptive batch scheduler configuration.

Note that DataSet API will not support use the AdaptiveBatchScheduler as default scheduler(See Limitations for more details).

Public Interfaces

Introduce a new adaptive batch scheduler configuration parameters:

  • Introduce "execution.batch.adaptive.auto-parallelism.enabled" as a switch for automatic parallelism derivation. This configuration item is enabled by default for batch jobs.

Modify the default values of the configuration parameters:

  • Modify the default value of jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task from 1024 Mb to 16 Mb.
  • Use parallelism.default as the default value of jobmanager.adaptive-batch-scheduler.max-parallelism.

Rename the configuration of adaptive batch scheduler:

Currently, the names of configuration items related to adaptive batch scheduler, including speculative execution configuration, start with "jobmanager.adaptive-batch-scheduler". However, it is unnecessary and complicated to expose the underlying scheduler to users. Therefore, we'd like to rename them to be more user friendly. Config items for automatically-parallelism-deciding will be named as "execution.batch.adaptive.auto-parallelism.xxx”. Config items for speculative execution will be named as "execution.batch.speculative.xxx”. When more adaptive optimization functionality is added to the adaptive batch scheduler in the future, their config items can be "execution.batch.adaptive.yyy.xxx".  The old configurations will be deprecated after the configuration renaming, and it still will be considered to guarantee compatibility.

Current key name

New key name

jobmanager.adaptive-batch-scheduler.min-parallelism

execution.batch.adaptive.auto-parallelism.min-parallelism

jobmanager.adaptive-batch-scheduler.max-parallelism

execution.batch.adaptive.auto-parallelism.max-parallelism

jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task

execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task

jobmanager.adaptive-batch-scheduler.default-source-parallelism

execution.batch.adaptive.auto-parallelism.default-source-parallelism

jobmanager.adaptive-batch-scheduler.speculative.enabled

execution.batch.speculative.enabled

jobmanager.adaptive-batch-scheduler.speculative.max-concurrent-executions

execution.batch.speculative.max-concurrent-executions

jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration

execution.batch.speculative.block-slow-node-duration

Proposed Changes

Make the adaptive batch scheduler as the default batch scheduler

At present, the default scheduler of Flink is the DefaultScheduler and if we want to use the adaptive batch scheduler, we need to set jobmanager.scheduler : AdaptiveBatch.

To make the Adaptive batch scheduler as the default batch scheduler, this logic would be the following: 

  • If the config option `jobmanager.scheduler` is configured, then Flink uses the configured one.
  • If the config option `jobmanager.scheduler` is not configured, then
    • If the job type is batch and DataStream, then choose `Adaptive Batch Scheduler`
    • If the job type is stream, then choose `Default Scheduler`

Simplify the configuration of adaptive batch scheduler

Currently, if users want to use adaptive batch scheduler, they need to configure the following configuration parameters:

  • parallelism.default:  -1
  • execution.batch.adaptive.auto-parallelism.max-parallelism:  the upper bound of allowed parallelism to set adaptively.
  • execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task: the size of data volume to expect each task instance to process.

In order to support adaptive batch scheduler as the default batch scheduler, we need to simplify the configuration of adaptive batch scheduler. 

To achieve this goal, we need to do following work:

  • Use parallelism.default as the default value of execution.batch.adaptive.auto-parallelism.max-parallelism.
  • Introduce "execution.batch.adaptive.auto-parallelism.enabled" as a switch for automatic parallelism derivation. This configuration item is enabled by default for batch jobs.
  • Modify the "execution.batch.adaptive.auto-parallelism.data-volume-per-task" default value.

Use parallelism.default as the default value of execution.batch.adaptive.auto-parallelism.max-parallelism.

At present, if users want to use the function of automatically deriving parallelism, users need to explicitly set parallelism.default to -1 to make the function of automatically deriving parallelism take effect. This is not friendly to use. In addition, the default value 128 of the configuration item execution.batch.adaptive.auto-parallelism.max-parallelism is too small, and users often need to increase it. 

Therefore, we plan to allow users to set the parallelism via the configuration parallelism.default like streaming jobs. However, in the case of adaptive batch scheduler, parallelism.default will be used as the maximum parallelism, and the actual parallelism of operators can be automatically reduced when the amount of data is small. 

In addition, if both parallelism.default and execution.batch.adaptive.auto-parallelism.max-parallelism are not configured, the max parallelism will be rolled back to 128.

Key

Current Default Value

New Default Value

execution.batch.adaptive.auto-parallelism.max-parallelism

128

If this configuration option is configured, use the configured value. Otherwise, use the configured value  of parallelism.default. If these two items are not configured, use the default parallelism value of 128.

Introduce "execution.batch.adaptive.auto-parallelism.enabled" as a switch for automatic parallelism derivation

To control whether automatic parallelism derivation is enabled, we plan to add a configuration item execution.batch.adaptive.auto-parallelism.enabled as the switch for automatic parallelism derivation. The default value of this configuration option is ‘true’ and if the user really wants the set parallelism not to be automatically adjusted, they can set this configuration to ‘false’.

Key

Type

Default Value

Description

execution.batch.adaptive.auto-parallelism.enabled

Boolean

True

A flag to enable or disable automatic parallelism derivation. It takes effect only in the AdaptiveBatchScheduler.

Modify adaptive batch scheduler configuration default values

At present, the default value of the configuration item execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task is 1024Mb, which is too large, so users often need to lower this configuration item in actual production scenarios. In addition, after several TPC-DS tests under 10T dataset, the results show that the configuration item with 16Mb has the best performance. So we plan to change the default value of execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task from 1024Mb to 16Mb.

Key

Current Default Value

New Default Value

execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task

MemorySize.ofMebiBytes(1024)

MemorySize.ofMebiBytes(16)

Limitations

Not support AdaptiveBatchScheduler as default scheduler for DataSet jobs

There are several reasons for this limitation:

  • DataSet API will be deprecated in the future and DataStream API is now the recommended low level API to develop Flink batch jobs.
  • AdaptiveBatchScheduler does not support pipeline edges, but the DataSet API has some hard code to use pipeline edge. So that it can not control by configuration like configuration "execution.batch-shuffle-mode" which is used in DataStream API.
  • It needs a lot of effort to determine whether the parallelism of the DataSet operator is set by the user.

Compatibility, Deprecation, and Migration Plan

  • The default batch scheduler will be changed to AdaptiveBatchScheduler. The adaptive batch scheduler does not support pipeline edges, so it only supports execution.batch-shuffle-mode as ALL_EXCHANGES_BLOCKING (default), ALL_EXCHANGES_HYBRID_ FULL and ALL_EXCHANGES_HYBRID_SELECTIVE. If users want to use pipeline edges, just add this config option: jobmanager.scheduler: default, so that DefaultScheduler will be used.

Test Plan

The proposed changes will be tested for correctness and performance through the TPC-DS benchmark suite in a real cluster.

Rejected Alternatives

No rejected alternatives yet.

  • No labels