Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Motivation

One of the most challenging aspects of running always-on streaming pipeline is the correct sizing of Flink deployments. Too few resources lead to unstable Flink jobs which requires users to scale up their deployments manually. On the other hand, a surplus of resources can become very costly to users over time. Even once optimized, the resource demands can change depending on the time of the day or due to seasonal patterns. Clearly, it would be desirable to automatically adjust the resources for Flink deployments. This process is referred to as autoscaling.

...

  1. Support autoscaling Flink jobs deployed on Kubernetes via the Flink Kubernetes operator
  2. Provide a default Flink job metric collector via the Flink metrics API
  3. Provide a robust default scaling algorithm
    1. Ensure scaling yields effective usage of assigned task slots, i.e. prevents backpressure but does not overprovision
    2. Ramp up in case of any backlog to ensure it gets processed in a timely manner
    3. Minimize the number of scaling decisions to prevent costly rescale operation
  4. Allow plugging in a custom scaling algorithm and metric collection code

Non-Goals (for now)

  1. Vertical scaling via adjusting container resources or changing the number of task slots per TM
  2. Autoscaling in standalone Flink session cluster deployment mode
  3. Integrate with the k8s scheduler to reserve required resources for rescaling

...

In this diagram, we show an example dataflow which reads from two different sources. Log 1 has a backlog growth rate of 100 records per time unit. Similarly, Log 2 has a backlog growth of 500. This means that without any processing, the backlog grows by the 100 or 500 records, respectively. Source 1 is able to read 10 records per time unit, Source 2 reads 50 records per time unit. The downstream operators process the records from the source and produce new records for their downstream operators. For simplicity, the number of inflowing and outflowing records for an operator is always the same here. The actual algorithm doesn't make that assumption.

Image Removed

Now let's evaluate one round by the algorithm:

Image Removed

When the scaling is strictly linear, this will lead the following optimal result:

Image RemovedImage Added

According to the reference paper, in the worst case it takes up to three attempts to reach this state. However, it can already converge after one scaling attempt. Note that the algorithm handles up- and downscaling. In the example, one of the vertices is scaled by parallelism 1 which won't change the parallelism, but it is also possible for vertices to be scaled down if the scaling factor is lower than 1.

The target utilization as defined by the busy time metric is 100% in this example but users will be able to configure it. Arguably, a target utilization of less than 100% will provide more stable scaling decisions.

Configuration


true
KeyDefaultDescription

job.autoscaler.enabled

false

Enable Job Autoscaler

job.autoscaler.scaling.enabled

false

Enable Job Vertex Scaling actions (if disabled only metrics will ble collected)

job.autoscaler.stabilization.interval

5min

Stabilization period in which no new scaling will be executed

job.autoscaler.scaling.sources.enabled

Whether to enable scaling source vertices.

job.autoscaler.target.utilization

0.7

Target vertex utilization

job.autoscaler.target.utilization.boundary

0.1

Target vertex utilization boundary. Scaling won't be performed if utilization is within (target - boundary, target + boundary)

job.autoscaler.scale-up.grace-period

10min

Period in which no scale down is allowed after a scale up

job.autoscaler.scale-down.max-factor

0.6

Max scale down factor. 1 means no limit on scale down, 0.6 means job can only be scaled down with 60% of the original parallelism.

job.autoscaler.catch-up.duration

5min

The target duration for fully processing any backlog after a scaling operation. Set to 0 to disable backlog based scaling.

job.autoscaler.vertex.max-parallelism


Max vertex parallelism after scale up.

job.autoscaler.vertex.min-parallelism


Min vertex parallelism after scale down.
job.autoscaler.metrics.history.duration10min

job.autoscaler.metrics.history.max-size


Maximum number of metric reports collected for each job.
job.autoscaler.metrics.history.min-size
Minimum number of metric reports to collect before scaling logic is executed

Scaling Execution

Once the scaling algorithm has computed the updates, the JobGraph needs to be updated. The most straight-forward way would be to supply parallelism overrides as part of the Flink configuration as proposed in

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-29501
. The operator would just modify the deployment spec and the job would redeploy with the updated parallelisms. Eventually, we could add support for Flink's /rescale API which would allow us to in-place rescale the cluster without tearing it down. However, this has the downside of losing the immutability of the deployment.

...