Status
Current state: Under Discussion
Discussion thread:
JIRA:
Released:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Streaming jobs which run for several days or longer usually experience changes in their workload during their lifetime. These changes can originate from seasonal spikes, such as day vs. night, weekdays vs. weekend or holidays vs. non-holidays, sudden events or simply growing popularity of your product. Some of these changes are more predictable than others but what all have in common is that they change the resource demand of your job if you want to keep the same service quality for your customers.
Even if you can estimate an upper bound for the maximum resource requirements, it is almost always prohibitively expensive to run your job from the very beginning with max resources. Consequently, it would be great if Flink could make use of resources (TaskManagers) which become available after a job has been started. Similarly, if the underlying resource management system decides that some of the currently assigned resources are needed elsewhere, Flink should not fail but scale the job down once the resources are revoked. Such a behaviour would make Flink a nicer citizen of the respective resource manager.
Ideally, Flink would control the resource (de-)allocation. However, not every Flink deployment knows about the underlying resource management system. Moreover, it can be easier to decide on the actual resource need from the application developer's perspective (external party). Hence, we are proposing the reactive execution mode which enables Flink to react to newly available or removed resources by scaling the job up or down with the goal to use the available resources as good as possible.
Proposed Changes
Overview
The proposed change builds upon the declarative resource management (FLIP-138) and the declarative scheduler (FLIP-160). With these changes, it is possible to declare a set of required resources, to start executing a job even if we haven't received all of the declared resources and to rescale the job if new resources become available. These are the ingredients we need in order to implement the reactive mode.
The reactive mode cannot know how many resources will be made available to the Flink cluster. Consequently, it needs to acquire all available resources. We can achieve this by declaring an infinite amount of required resources. This will make sure that whenever a new TaskManager registers at the ResourceManager, then the ResourceManager will assign the new slots to the JobMaster which is responsible for executing the job. Using the declarative scheduler, we will start executing the job once the set of resources has stabilised and adjust the parallelism whenever the set of resources changes. Thereby Flink will become able to make use of all resources which are available in the cluster.
The way users will use this feature is by deploying a per-job or application cluster with the configuration option execution-mode
set to reactive
. This option will only be allowed if the user deploys a streaming job and uses either the per-job or application cluster (dedicated cluster per job). Session clusters will not support the reactive mode.
bin/flink run -Dexecution-mode=reactive --target yarn-per-job path/to/MyJob.jar
What happens underneath when using the reactive execution mode is that the parallelism of every operator will be set to Integer.MAX_VALUE
. Moreover the cluster will be started using the declarative scheduler.
Limitations
Initially we only intend to support this mode for a limited subset of jobs:
- Streaming jobs only
- No support for fine grained resources
- No fixed parallelism for any of the operators
Moreover, we only intend to support this mode for per-job and application clusters for the time being. This will exclude the problem of how to distribute cluster resources among multiple jobs whose requirements have not been satisfied.
Implementation
On the JobMaster side, the Reactive Mode is build on top of the declarative scheduler (FLIP-160). The behavior of the declarative scheduler is customizable through a pluggable SlotAllocator and ScalingPolicy. In the first implementation of the scheduler, the implementations for the SlotAllocator and ScalingPolicy will not be configurable, and hardwired to Reactive Mode specific implementations. These implementations will expose a minimal set of configuration options, which are described in FLIP-160.
Ideas for user interaction
Idea 0: "All Inclusive"
Introduce execution-mode
configuration option (that can be passed as a -D command line argument, or through flink-conf.yaml), that sets parallelism, checks if the job is streaming only, has no fine grained resources on the submitting client and is not a session cluster. On the JobManager, the mode mutates the configuration of the cluster to have the declarative scheduler configured (alternatively, the client mutates the configuration of the cluster-to-be-launched)
Pro:
- Easier to use, as we require only one additional flag at submission time
Idea 1: "Configure it"
User configures declarative scheduler in flink-conf.yaml + sets a boolean flag in the ExecutionConfig (programmatically or through the PipelineOptions (-Dpipeline.reactive-mode=true
) . The StreamGraphGenerator will overwrite all operator parallelism to Integer.MAX_VALUE, and throw an exception if the job contains anything not supported.
Pro:
- The approach is more flexible, and maybe cleaner (for us): There's a client-side configuration parameter that controls the JobGraph generation, and a server-side parameter that controls the scheduler to use.
- For example: If you know what you are doing, you can use reactive mode with a session cluster (imagine a user that only automated the deployment of one-job-session-clusters)
- You can also use reactive-mode without that flag by manually configuring a high parallelism
Con:
- Users could accidentally use reactive mode with a session cluster
- Users need to configure two things
Idea 2: "Do it yourself"
Users just configure the declarative scheduler, and set a high parallelism themselves.
Pro:
- Easy to do for us
- underlines the beta character of the feature
Con:
- Easy to make mistakes (trying to run a job that is not supported; submitting multiple jobs to a session cluster)
- potentially poor experience
Compatibility, Deprecation, and Migration Plan
The reactive mode is a new feature which the user needs to explicitly activate. Hence, there is no migration needed.
Test Plan
The new execution mode should be covered by end-to-end tests which ensure the correct functioning of the reactive mode. Moreover, we should try to find open source user who are willing to try this feature out in order to gather feedback on its ergonomics and overall usefulness.