Status
Current state: Accepted by vote
Discussion thread: https://lists.apache.org/thread.html/ra688faf9dca036500f0445c55671e70ba96c70f942afe650e9db8374%40%3Cdev.flink.apache.org%3E
JIRA:
Released:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Streaming jobs which run for several days or longer usually experience changes in their workload during their lifetime. These changes can originate from seasonal spikes, such as day vs. night, weekdays vs. weekend or holidays vs. non-holidays, sudden events or simply growing popularity of your product. Some of these changes are more predictable than others but what all have in common is that they change the resource demand of your job if you want to keep the same service quality for your customers.
Even if you can estimate an upper bound for the maximum resource requirements, it is almost always prohibitively expensive to run your job from the very beginning with max resources. Consequently, it would be great if Flink could make use of resources (TaskManagers) which become available after a job has been started. Similarly, if the underlying resource management system decides that some of the currently assigned resources are needed elsewhere, Flink should not fail but scale the job down once the resources are revoked. Such a behaviour would make Flink a nicer citizen of the respective resource manager.
Ideally, Flink would control the resource (de-)allocation. However, not every Flink deployment knows about the underlying resource management system. Moreover, it can be easier to decide on the actual resource need from the application developer's perspective (external party). Hence, we are proposing the reactive execution mode which enables Flink to react to newly available or removed resources by scaling the job up or down with the goal to use the available resources as good as possible.
The Reactive Mode allows Flink users to implement a powerful autoscaling mechanism, by having an external service monitor certain metrics, such as consumer lag, aggregate CPU utilization, throughput or latency. As soon as these metrics are above or beyond a certain threshold, additional TaskManagers can be added or removed from the Flink cluster. This could be implemented through changing the replica factor of a Kubernetes deployment, or an autoscaling group.
This figure shows how a job is scaling up as soon as a third TaskManager is added to the Flink cluster:
Proposed Changes
Overview
The proposed change builds upon the declarative resource management (FLIP-138) and the declarative scheduler (FLIP-160). With these changes, it is possible to declare a set of required resources, to start executing a job even if we haven't received all of the declared resources and to rescale the job if new resources become available. These are the ingredients we need in order to implement the reactive mode.
The reactive mode cannot know how many resources will be made available to the Flink cluster. Consequently, it needs to acquire all available resources. We can achieve this by declaring an infinite amount of required resources. This will make sure that whenever a new TaskManager registers at the ResourceManager, then the ResourceManager will assign the new slots to the JobMaster which is responsible for executing the job. Using the declarative scheduler, we will start executing the job once the set of resources has stabilized and adjust the parallelism whenever the set of resources changes. Thereby Flink will be able to make use of all resources which are available in the cluster. The definition of "stable resources" will be discussed in FLIP-160.
The way users will use this feature is by deploying a standalone application cluster with the configuration option execution-mode
set to reactive
.
Usage Example:
# put job jar into classpath cp examples/streaming/TopSpeedWindowing.jar lib/ # deploy standalone application cluster in reactive mode ./bin/standalone-job.sh start -Dexecution-mode=reactive -j org.apache.flink.streaming.examples.windowing.TopSpeedWindowing # Launch one or more task managers .. to see the job scale up ./bin/taskmanager.sh start # stop cluster: ./bin/taskmanager.sh stop ./bin/standalone-job.sh stop
Limitations
Initially we only intend to support reactive mode for a limited subset of jobs:
- Streaming jobs only
- No support for fine grained resources
- No fixed parallelism for any of the operators
- Unaligned checkpoints (since they do not support rescaling at the moment )
Deployment is only supported as a standalone application deployment. Active resource managers (such as native Kubernetes, YARN or Mesos) are explicitly not supported. Standalone session clusters are not supported either. The application deployment is limited to single job applications.
The parallelism of individual operators in a streaming job will be determined by the DeclarativeScheduler. It is not configurable by the user (except that the per-operator maxParallelism is the upper bound for the parallelism determined). The maxParallelism is bounded by 2^15 (32768).
Implementation
JobManager changes
When the JobManager is starting, the ClusterEntrypoint
will check if reactive mode is enabled, and fail if an entry point other than "StandaloneApplicationClusterEntryPoint
" is detected (we might consider allowing standalone session clusters with a warning).
In the JobMaster component, the Reactive Mode is build on top of the declarative scheduler (FLIP-160). In the first implementation of the scheduler, the implementations for the SlotAllocator and ScaleUpController will not be configurable, and hardwired to Reactive Mode specific implementations.
The declarative scheduler will prepare the submitted JobGraph for reactive mode: The parallelism and maxParalllelism of all tasks will be set to 2^15, except for those Tasks where the maxParallelism is manually set to a value different from the default value (-1). Due to the key groups, we can not scale beyond a parallelism of 2^15 (32768), and some tasks are not intended to run with more than one instance (e.g. global aggregations)
The declarative scheduler will also check if unaligned checkpoints are enabled. If so, execution will be denied.
On initial startup, the declarative scheduler will wait indefinitely for TaskManagers to show up. Once there are enough TaskManagers available to start the job, and the set of resources is stable (see FLIP-160 for a definition), the job will start running.
Once the job has started running, and a TaskManager is lost, it will wait for 10 seconds for the TaskManager to re-appear. Otherwise, the job will be scheduled again with the available resources. If no TaskManagers are available anymore, the declarative scheduler will wait indefinitely again for new resources.
Activation by the user
The reactive mode can be activated through a new "execution-mode
" configuration option (that can be passed as a -D
command line argument, or through flink-conf.yaml
).
Configuration
We will introduce the following new configuration parameters:
jobmanager.declarative-scheduler.min-parallelism-increase
: (Default 1) Configure the minimum increase in cumulative parallelism for a job to scale up.execution-mode
: (Default: none). If set to "reactive", reactive mode as defined on this page (see "Activation by the user") will be enabled.
Both configuration options will be marked as experimental, to allow us removing or changing them in future releases.
Documentation
The reactive mode will be described on a new documentation page called "Elastic Scaling", the page will be located in the "Deployment" part of the documentation, between "Memory Configuration" and "Command-Line Interface".
The page will initially only describe the Reactive Mode. In the long run, auto scaling, custom scaling policies etc. will be described there as well.
Compatibility, Deprecation, and Migration Plan
The reactive mode is a new feature which the user needs to explicitly activate. Hence, there is no migration needed.
Test Plan
The new execution mode should be covered by end-to-end tests which ensure the correct functioning of the reactive mode. Moreover, we should try to find open source user who are willing to try this feature out in order to gather feedback on its ergonomics and overall usefulness.
Future Extensions
Pre-flight Check
Depending on initial user feedback, we might introduce a pre-flight sanity check into the StreamGraphGenerator
, that checks if the streaming job is suitable for reactive mode (single stream graph, no fine-grained resources, no manual parallelism configuration, no unaligned checkpoints)
Non-homogeneous scaling
In the first implementation, scaling of individual operators is not controllable by the user. It is determined by the DeclarativeScheduler, more specifically, the SlotAllocator component. This can lead to situations where the Scheduler determines a parallelism for certain operators, that is not optimal (for example a Kafka consumer having more instances than available partitions; or an operator that will anyways only run with parallelism = 1).
An extension to ReactiveMode would be giving the user more control over the scaling behavior of individual operators. Ideas for this:
- Do the scaling proportional to the initially configured parallelism, capped by the maxParallelism.
- Allow the user to define three parallelism: min / target / max. If min parallelism can not be satisfied, fail.
- Allow the user to implement a callback, that determines the parallelism for the scheduler.
Communication with external systems and customizable scaling behavior
The "ScaleUpController" described in FLIP-160 is currently not exposed to the user, thus, scaling decisions are only customizable based on the provided configuration options.
By allowing users to implement a custom ScalingPolicy, much more customizations are possible:
- you can make calls to external system (such as a central "Flink Clusters Management Service") for scaling decisions.
- users can implement extended scaling policies, exposing more configuration parameters than just "min-executing-time" and "additional-parallelism".
- For large-state streaming jobs, it might make sense to only allow to scale up within a certain timeframe after a completed checkpoint.
- Reactive mode currently requests an infinite amount of resources. By allowing the ScaleUpController (or a future ScalingPolicy) to dynamically adjust resource demands, the declarative scheduler will become a well-behaved citizen within active resource managers. Note that we might consider this in a separate FLIP as a proper Autoscaling mode.
Rejected Alternatives
Activation by the user
We've considered two other alternatives for the activation of reactive mode:
- Alternative 1: Introduction of a configuration parameter for the client side (setting the parallelism, checking of conditions) and on the server side (scheduler type). This alternative was rejected due to its complexity for the user.
- Alternative 2: Users have to manually set a high parallelism, ensure that the job is suitable for the declarative scheduler and configure the declarative scheduler. This alternative was rejected due to its poor user experience.
Documentation
We also considered adding "Reactive Mode" to the "Deployment / Resource Providers / Standalone" page, but that doesn't seem to be the right separation of concerns, and it is too hidden in the documentation, given that the features relating to scaling are going to further in the future.