Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Motivation

One of the most challenging aspects of running always-on streaming pipeline is the correct sizing of Flink deployments. Too few resources lead to unstable Flink jobs which requires users to scale up their deployments manually. On the other hand, a surplus of resources can become very costly to users over time. Even once optimized, the resource demands can change depending on the time of the day or due to seasonal patterns. Clearly, it would be desirable to automatically adjust the resources for Flink deployments. This process is referred to as autoscaling.

...

  1. Support autoscaling Flink jobs deployed on Kubernetes via the Flink Kubernetes operator
  2. Provide a default Flink job metric collector via the Flink metrics API
  3. Provide a robust default scaling algorithm
    1. Ensure scaling yields effective usage of assigned task slots, i.e. prevents backpressure but does not overprovision
    2. Ramp up in case of any backlog to ensure it gets processed in a timely manner
    3. Minimize the number of scaling decisions to prevent costly rescale operation
  4. Allow plugging in a custom scaling algorithm and metric collection code

Non-Goals (for now)

  1. Vertical scaling via adjusting container resources or changing the number of task slots per TM
  2. Autoscaling in standalone Flink session cluster deployment mode
  3. Integrate with the k8s scheduler to reserve required resources for rescaling

...

The new processing rate is calculated by looking at the current processing rate and the backlog size, as well as the estimated downtime during rescale. Further, users can configure a maximum time to fully process the backlog.
Current backlog size information should come from the connector metrics (when available) and the estimated time to rescale could either be provided by a user (in this we need to create a config option for this) or the operator could keep track of past scaling actions and measure their time to determine this automatically. However in the first iteration it should be enough to scale based on the current backlog at the time of scaling, and the spare capacity allocated should allow for catching up the lag growth during the scling itself.

 

For most sources like Kafka there is an upper bound on the parallelism based on the number of partitions. If backlog information is not available, users can set desired parallelism or target rate. We also aim at choosing a parallelism which yields an equal distribution of the splits among the subtasks.

Sidenote about scaling sources without backlog information: Backlog growth information is necessary to scale sources succesfully under backpressure. However there is nothing stopping us from scaling the sources on a busyTime/CPU time basis once backpressure has been eliminated by scaling up downstream operators. This was not part of our initial prototype implementation but it could be extended easily, as it does not affect the algorithm at large.
Most regular streaming sources should in any case provide backlog / input rate information even if they are not currently exposed under pendingRecords. It should be our goal to improve connector metrics, not only to provide the best autoscaling capability but good operational experience in general.

...

In this diagram, we show an example dataflow which reads from two different sources. Log 1 has a backlog growth rate of 100 records per time unit. Similarly, Log 2 has a backlog growth of 500. This means that without any processing, the backlog grows by the 100 or 500 records, respectively. Source 1 is able to read 10 records per time unit, Source 2 reads 50 records per time unit. The downstream operators process the records from the source and produce new records for their downstream operators. For simplicity, the number of inflowing and outflowing records for an operator is always the same here. The actual algorithm doesn't make that assumption.

Image Removed

Now let's evaluate one round by the algorithm:

Image Removed

When the scaling is strictly linear, this will lead the following optimal result:

Image Removed

Image Added

According to the reference paper, in the worst case it takes up to three attempts to reach this state. However, it can already converge after one scaling attempt. Note that the algorithm handles up- and downscaling. In the example, one of the vertices is scaled by parallelism 1 which won't change the parallelism, but it is also possible for vertices to be scaled down if the scaling factor is lower than 1.

The target utilization as defined by the busy time metric is 100% in this example but users will be able to configure it. Arguably, a target utilization of less than 100% will provide more stable scaling decisions.

Configuration


true
KeyDefaultDescription

job.autoscaler.enabled

false

Enable Job Autoscaler

job.autoscaler.scaling.enabled

false

Enable Job Vertex Scaling actions (if disabled only metrics will ble collected)

job.autoscaler.stabilization.interval

5min

Stabilization period in which no new scaling will be executed

job.autoscaler.scaling.sources.enabled

Whether to enable scaling source vertices.

job.autoscaler.target.utilization

0.7

Target vertex utilization

job.autoscaler.target.utilization.boundary

0.1

Target vertex utilization boundary. Scaling won't be performed if utilization is within (target - boundary, target + boundary)

job.autoscaler.scale-up.grace-period

10min

Period in which no scale down is allowed after a scale up

job.autoscaler.scale-down.max-factor

0.6

Max scale down factor. 1 means no limit on scale down, 0.6 means job can only be scaled down with 60% of the original parallelism.

job.autoscaler.catch-up.duration

5min

The target duration for fully processing any backlog after a scaling operation. Set to 0 to disable backlog based scaling.

job.autoscaler.vertex.max-parallelism


Max vertex parallelism after scale up.

job.autoscaler.vertex.min-parallelism


Min vertex parallelism after scale down.
job.autoscaler.metrics.history.duration10min

job.autoscaler.metrics.history.max-size


Maximum number of metric reports collected for each job.
job.autoscaler.metrics.history.min-size
Minimum number of metric reports to collect before scaling logic is executed

Scaling Execution

Once the scaling algorithm has computed the updates, the JobGraph needs to be updated. The most straight-forward way would be to supply parallelism overrides as part of the Flink configuration as proposed in

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-29501
. The operator would just modify the deployment spec and the job would redeploy with the updated parallelisms. Eventually, we could add support for Flink's /rescale API which would allow us to in-place rescale the cluster without tearing it down. However, this has the downside of losing the immutability of the deployment.

...