Status

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, one Flink cluster can only use one shuffle service plugin configured by 'shuffle-service-factory.class'. This is not a problem for per-job cluster, but for session cluster, this is not flexible enough and cannot support use cases like selecting different shuffle service for different workloads (e.g. batch vs. streaming). As discussed in https://lists.apache.org/thread/k4owttq9q3cq4knoobrzc31bghf7vc0o, the community has considered to implement this feature when introducing the pluggable shuffle service abstraction, there are also some relevant followup issues: Unable to render Jira issues macro, execution error. . We'd like to implement this feature in this FLIP.

Public Interfaces

To Implement this feature, we need to support job level shuffle service configuration. Currently, only cluster level shuffle service configuration is supported by configuring 'shuffle-service-factory.class'. If a user first starts a Flink cluster and then he/she tries to change the shuffle service by reconfiguring 'shuffle-service-factory.class', he/she will fail because per-job level configuration will not passed to runtime so does not take effect currently.

To support job level shuffle service configuration, there are two simple options (both options need to allow users to config job level shuffle service by using configuration file or dynamic configurations):

1. Add an interface to ExecutionConfig and use the newly added interface to decide which shuffle service to use. The advantage of this option is that the scope is more clear and no other thing will be influenced. The disadvantage of this option is that we need to deserialize the ExecutionConfig with user code classloader first before using it which makes it a little complicate.

/** Sets the shuffle service to be used by the corresponding job. */
public ExecutionConfig setShuffleServiceName(String shuffleServiceFactoryName) {
if (this.shuffleServiceFactoryName == null) {
this.shuffleServiceFactoryName = shuffleServiceFactoryName;
}
return this;
}

public String getShuffleServiceName() {
return shuffleServiceFactoryName;
}

2. Pass job level configuration to the jobConfiguration filed of JobGraph (as far as I know, current only the ) and then get the shuffle service to be used from this configuration. The advantage of this option is that we introduce a general way for job level configuration which can be reused by others. The disadvantage of this option is that the jobConfiguration in JobGraph need to carry with some unnecessary configurations.

Proposed Changes

1. The configured shuffle service when the cluster starts up will be treated as the default shuffle service and if no shuffle service if configured for the job submitted explicitly, this shuffle service will be used. If the default shuffle service fails to be loaded, the whole cluster fails.

2. Load all shuffle services with the PluginManager when Flink cluster starts and report all data shuffle ports to the JobManager when registering TaskManager (currently, only one data shuffle port is reported). Different shuffle services are identified by the shuffle service factory name. The internal NettyShuffleService will be still loaded directly without using the PluginManager, so it will not be influenced. Besides, to keep the load behavior unchanged, the default shuffle service mentioned in 1 can also be loaded directly.

3. Support job level shuffle service configuration and select the right shuffle service to use in JobMaster and TaskExecutor. As discussed in the above section, there are two basic ways to Implement job level shuffle service selection.

4. Share the same network memory pool (NetworkBufferPool) among different shuffle services. It is compatible with the Flink memory model which treats the network memory as one dimension of resource. In fact, we need a public NetworkMemoryProvider interface to expose the network resource to shuffle service plugin. We can leave this together with other user interface relevant work (see Unable to render Jira issues macro, execution error. ) as future work.


Here is a PoC implementation: https://github.com/wsry/flink/commit/bf6b56b1cd6591e7b3fad6fddd3f701a06c64868.

Compatibility, Deprecation, and Migration Plan

1. If not configured as default, the shuffle service must be implemented as Flink plugin which can be loaded by Flink PluginManager. 

2. If a custom shuffle service plugin already creates the NetworkBufferPool (not a public interface) instance itself, it must switch to use the shared NetworkBufferPool.

Test Plan

Test use both test cases and the flink-remote-shuffle project (https://github.com/flink-extended/flink-remote-shuffle).

Rejected Alternatives

No.