Status
...
Page properties | |||||
---|---|---|---|---|---|
|
...
|
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
The Reactive Mode allows Flink users to implement a powerful autoscaling mechanism, by having an external service monitor certain metrics, such as consumer lag, aggregate CPU utilization, throughput or latency. As soon as these metrics are above or beyond a certain threshold, additional TaskManagers can be added or removed from the Flink cluster. This could be implemented through changing the replica factor of a Kubernetes deployment, or an autoscaling group.
This figure shows how a job is scaling up as soon as a third TaskManager is added to the Flink cluster:
Proposed Changes
...
The proposed change builds upon the declarative resource management (FLIP-138) and the declarative adaptive scheduler (FLIP-160). With these changes, it is possible to declare a set of required resources, to start executing a job even if we haven't received all of the declared resources and to rescale the job if new resources become available. These are the ingredients we need in order to implement the reactive mode.
The reactive mode cannot know how many resources will be made available to the Flink cluster. Consequently, it needs to acquire all available resources. We can achieve this by declaring an infinite amount of required resources. This will make sure that whenever a new TaskManager registers at the ResourceManager, then the ResourceManager will assign the new slots to the JobMaster which is responsible for executing the job. Using the declarative adaptive scheduler, we will start executing the job once the set of resources has stabilised stabilized and adjust the parallelism whenever the set of resources changes. Thereby Flink will become be able to make use of all resources which are available in the cluster. The definition of "stable resources" will be discussed in FLIP-160.
The way users will use this feature is by deploying a per-job or standalone application cluster with the configuration option execution-mode
set to reactive
. This option will only be allowed if the user deploys a streaming job and uses either the per-job or application cluster (dedicated cluster per job). Session clusters will not support the reactive mode. set to reactive
.
Usage Example:
Code Block |
---|
# put job jar into classpath
cp examples/streaming/TopSpeedWindowing.jar lib/
# deploy standalone application cluster in reactive mode
./bin/standalone-job.sh start |
Code Block |
bin/flink run -Dexecution-mode=reactive --target yarn-per-job path/to/MyJob.jar |
...
j org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
# Launch one or more task managers .. to see the job scale up
./bin/taskmanager.sh start
# stop cluster:
./bin/taskmanager.sh stop
./bin/standalone-job.sh stop |
Limitations
Initially we only intend to support this reactive mode for a limited subset of jobs:
- Streaming jobs only
- No support for fine grained resources
- No fixed parallelism for any of the operators
- Unaligned checkpoints (since they do not support rescaling at the moment )
)Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-17979
Deployment is only supported as a standalone application deployment. Active resource managers (such as native Kubernetes, YARN or Mesos) are explicitly not supported. Standalone session clusters are not supported either. The application deployment is limited to single job applicationsAlso, there are limitations to the deployments modes supported: We only intend to support this mode for per-job and application clusters for the time being. This will exclude the problem of how to distribute cluster resources (on a session cluster) among multiple jobs whose requirements have not been satisfied.
In addition to that limitation, reactive mode only works with standalone deployments: Active mode deployments (such as YARN or the native Kubernetes deployment) would flood the underlying resource manager with infinite resource requests.
The parallelism of individual operators in a streaming job will be determined by the DeclarativeSchedulerAdaptive Scheduler. It is not configurable by the user (except that the per-operator maxParallelism is the upper bound for the parallelism determined). The maxParallelism is bounded by 2^15 (32768).
Implementation
JobManager changes
When the JobManager is starting, the ClusterEntrypoint
will check if reactive mode is enabled, and fail if an entry point other than "StandaloneApplicationClusterEntryPoint
" is detected (we might consider allowing standalone session clusters with a warning).
In the JobMaster componentOn the JobMaster side, the Reactive Mode is build on top of the declarative adaptive scheduler (FLIP-160). In the first implementation of the scheduler, the implementations for the SlotAllocator and ScalingPolicy ScaleUpController will not be configurable, and hardwired to Reactive Mode specific implementations. These implementations will expose a minimal set of configuration options, which are described in FLIP-160
The JobManagerRunnerFactory will prepare the submitted JobGraph for reactive mode: The parallelism and maxParalllelism of all tasks will be set to 2^15, except for those Tasks where the maxParallelism is manually set to a value different from the default value (-1). Due to the key groups, we can not scale beyond a parallelism of 2^15 (32768), and some tasks are not intended to run with more than one instance (e.g. global aggregations).
This means that any parallelism configured by the user will be overwritten and thus ignored in Reactive Mode.
On initial startup, the adaptive scheduler will wait indefinitely for TaskManagers to show up. Once there are enough TaskManagers available to start the job, and the set of resources is stable (see FLIP-160 for a definition), the job will start running.
Once the job has started running, and a TaskManager is lost, it will wait for 10 seconds for the TaskManager to re-appear. Otherwise, the job will be scheduled again with the available resources. If no TaskManagers are available anymore, the adaptive scheduler will wait indefinitely again for new resources.
Activation by the user
The reactive mode can be activated through the a new "execution-mode
" configuration option (that can be passed as a -D
command line argument, or through flink-conf.yaml
).
If a streaming job is submitted, and execution-mode=reactive
is set, the new declarative scheduler will be configured. The scheduler will internally change the JobGraph so that the parallelism / maxParallelism of Tasks is set to Short.MAX_VALUE
. maxParallelism will only be set for those tasks, which have the default value (-1) set. The purpose of this special case is that some Tasks might be unable to run in parallel (e.g. a global aggregate).
Depending on initial user feedback, we might introduce a pre-flight sanity check into the StreamGraphGenerator
, that checks if the streaming job is suitable for reactive mode. Similarly, a warning should be printed if a session cluster receives a reactive mode job.
Example with custom config:
Code Block |
---|
bin/flink run -Dexecution-mode=reactive -Dpipeline.declarative-scheduler.reactive.min-execution-time="10 min" --target yarn-per-job path/to/MyJob.jar |
We will also leave the option to manually configure reactive mode for advanced users. They can manually set a maximum parallelism for their streaming job, and configure a different scheduler.
Configuration
Configuration
We will introduce the following new configuration parameters:
jobmanager.adaptive-scheduler.min-parallelism-increase
: (Default 1) Configure the minimum increase in cumulative parallelism for a job to scale up.execution-mode
: (Default: none). If set to "reactive", reactive mode as defined on this page (see "Activation by the user") will be enabled.
Both configuration options will be marked as experimental, to allow us removing or changing them in future releases.
Documentation
The reactive mode will be described on a new documentation page called "Elastic Scaling", the page will be located in the "Deployment" part of the documentation, between "Memory Configuration" and "Command-Line Interface".
The page will initially only describe the Reactive Mode. In the long run, auto scaling, custom scaling policies etc. will be described there as welltbd – pending review of ScalingPolicy.
Compatibility, Deprecation, and Migration Plan
...
The new execution mode should be covered by end-to-end tests which ensure the correct functioning of the reactive mode. Moreover, we should try to find open source user who are willing to try this feature out in order to gather feedback on its ergonomics and overall usefulness.
Future Extensions
Pre-flight Check
Depending on initial user feedback, we might introduce a pre-flight sanity check into the StreamGraphGenerator
, that checks if the streaming job is suitable for reactive mode (single stream graph, no fine-grained resources, no manual parallelism configuration, no unaligned checkpoints)
Non-homogeneous scaling
In the first implementation, scaling of individual operators is not controllable by the user. It is determined by the DeclarativeSchedulerAdaptive Scheduler, more specifically, the SlotAllocator component. This can lead to situations where the Scheduler determines a parallelism for certain operators, that is not optimal (for example a Kafka consumer having more instances than available partitions; or an operator that will anyways only run with parallelism = 1).
...
Communication with external systems and customizable scaling behavior
The "ScalingPolicy
ScaleUpController" described in FLIP-160 is currently not exposed to the user, thus, scaling decisions are only customizable based on the provided configuration options.
...
- you can make calls to external system (such as a central "Flink Clusters Management Service") for scaling decisions.
- users can implement extended scaling policies, exposing more configuration parameters than just "min-executing-time" and "additional-parallelism".
- For large-state streaming jobs, it might make sense to only allow to scale up within a certain timeframe after a completed checkpoint.
- Reactive mode currently requests an infinite amount of resources. By allowing the ScaleUpController (or a future ScalingPolicy) to dynamically adjust resource demands, the declarative adaptive scheduler will become a well-behaved citizen within active resource managers. Note that we might consider this in a separate FLIP as a proper Autoscaling mode.
Rejected Alternatives
Activation by the user
We've considered two other alternatives for the activation of reactive mode:
- Alternative 1: Introduction of a configuration parameter for the client side (setting the parallelism, checking of conditions) and on the server side (scheduler type). This alternative was rejected due to its complexity for the user.
- Alternative 2: Users have to manually set a high parallelism, ensure that the job is suitable for the declarative adaptive scheduler and configure the declarative adaptive scheduler. This alternative was rejected due to its poor user experience.
Documentation
We also considered adding "Reactive Mode" to the "Deployment / Resource Providers / Standalone" page, but that doesn't seem to be the right separation of concerns, and it is too hidden in the documentation, given that the features relating to scaling are going to further in the future.