Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This figure shows how a job is scaling up as soon as a third TaskManager is added to the Flink cluster:

Image RemovedImage Added

Proposed Changes

...

The proposed change builds upon the declarative resource management (FLIP-138) and the declarative adaptive scheduler (FLIP-160). With these changes, it is possible to declare a set of required resources, to start executing a job even if we haven't received all of the declared resources and to rescale the job if new resources become available. These are the ingredients we need in order to implement the reactive mode.

The reactive mode cannot know how many resources will be made available to the Flink cluster. Consequently, it needs to acquire all available resources. We can achieve this by declaring an infinite amount of required resources. This will make sure that whenever a new TaskManager registers at the ResourceManager, then the ResourceManager will assign the new slots to the JobMaster which is responsible for executing the job. Using the declarative adaptive scheduler, we will start executing the job once the set of resources has stabilized and adjust the parallelism whenever the set of resources changes. Thereby Flink will be able to make use of all resources which are available in the cluster. The definition of "stable resources" will be discussed in FLIP-160.

...

The parallelism of individual operators in a streaming job will be determined by the DeclarativeSchedulerAdaptive Scheduler. It is not configurable by the user (except that the per-operator maxParallelism is the upper bound for the parallelism determined). The maxParallelism is bounded by 2^15 (32768).

...

When the JobManager is starting, the ClusterEntrypoint will check if reactive mode is enabled, and fail if an entry point other than "StandaloneApplicationClusterEntryPoint" is detected (we might consider allowing standalone session clusters with a warning).
In the JobMaster component, the Reactive Mode is build on top of the declarative adaptive scheduler (FLIP-160). In the first implementation of the scheduler, the implementations for the SlotAllocator and ScaleUpController will not be configurable, and hardwired to Reactive Mode specific implementations.
The declarative scheduler will The JobManagerRunnerFactory will prepare the submitted JobGraph for reactive mode: The parallelism and maxParalllelism of all tasks will be set to 2^15, except for those Tasks where the maxParallelism is manually set to a value different from the default value (-1). Due to the key groups, we can not scale beyond a parallelism of 2^15 (32768), and some tasks are not intended to run with more than one instance (e.g. global aggregations)
The declarative scheduler will also check if unaligned checkpoints are enabled. If so, execution will be denied.
This means that any parallelism configured by the user will be overwritten and thus ignored in Reactive Mode.

On initial startup, the declarative adaptive scheduler will wait indefinitely for TaskManagers to show up. Once there are enough TaskManagers available to start the job, and the set of resources is stable (see FLIP-160 for a definition), the job will start running.

Once the job has started running, and a TaskManager is lost, it will wait for 10 seconds for the TaskManager to re-appear. Otherwise, the job will be scheduled again with the available resources. If no TaskManagers are available anymore, the declarative adaptive scheduler will wait indefinitely again for new resources.

...

We will introduce the following new configuration parameters:

  • jobmanager.declarativeadaptive-scheduler.min-parallelism-increase: (Default 1) Configure the minimum increase in cumulative parallelism for a job to scale up.
  • execution-mode: (Default: none). If set to "reactive", reactive mode as defined on this page (see "Activation by the user") will be enabled.

...

In the first implementation, scaling of individual operators is not controllable by the user. It is determined by the DeclarativeSchedulerAdaptive Scheduler, more specifically, the SlotAllocator component. This can lead to situations where the Scheduler determines a parallelism for certain operators, that is not optimal (for example a Kafka consumer having more instances than available partitions; or an operator that will anyways only run with parallelism = 1).

...

  • you can make calls to external system (such as a central "Flink Clusters Management Service") for scaling decisions.
  • users can implement extended scaling policies, exposing more configuration parameters than just "min-executing-time" and "additional-parallelism".
  • For large-state streaming jobs, it might make sense to only allow to scale up within a certain timeframe after a completed checkpoint.
  • Reactive mode currently requests an infinite amount of resources. By allowing the ScaleUpController (or a future ScalingPolicy) to dynamically adjust resource demands, the declarative adaptive scheduler will become a well-behaved citizen within active resource managers. Note that we might consider this in a separate FLIP as a proper Autoscaling mode.

...

  • Alternative 1: Introduction of a configuration parameter for the client side (setting the parallelism, checking of conditions) and on the server side (scheduler type). This alternative was rejected due to its complexity for the user.
  • Alternative 2: Users have to manually set a high parallelism, ensure that the job is suitable for the declarative adaptive scheduler and configure the declarative adaptive scheduler. This alternative was rejected due to its poor user experience.

...