Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion threadhttps://lists.apache.org/thread/pvfb3fw99mj8r1x8zzyxgvk4dcppwssz
Vote threadhttps://lists.apache.org/thread/75dz0opdz9l8trv7spvjvfdv7j8ny3kr
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-30260

Release

Motivation

One of the most challenging aspects of running always-on streaming pipeline is the correct sizing of Flink deployments. Too few resources lead to unstable Flink jobs which requires users to scale up their deployments manually. On the other hand, a surplus of resources can become very costly to users over time. Even once optimized, the resource demands can change depending on the time of the day or due to seasonal patterns. Clearly, it would be desirable to automatically adjust the resources for Flink deployments. This process is referred to as autoscaling.

...

The target utilization as defined by the busy time metric is 100% in this example but users will be able to configure it. Arguably, a target utilization of less than 100% will provide more stable scaling decisions.

Configuration


.scaling.sourcesenabledjob.autoscaler.
KeyDefaultDescription

job.autoscaler.enabled

false

Enable Job Autoscaler

job.autoscaler.scaling.enabled

false

Enable Job Vertex Scaling actions (if disabled only metrics will ble collected)

job.autoscaler.stabilization.interval

5min

Stabilization period in which no new scaling will be executed

job.autoscaler

.

true

Whether to enable scaling source vertices.

target.utilization

0.7

Target vertex utilization

job.autoscaler.target.utilization.boundary

0.1

Target vertex utilization boundary. Scaling won't be performed if utilization is within (target - boundary, target + boundary)

job.autoscaler.scale-up.grace-period

10min

Period in which no scale down is allowed after a scale up

job.autoscaler.scale-down.max-factor

0.6

Max scale down factor. 1 means no limit on scale down, 0.6 means job can only be scaled down with 60% of the original parallelism.

job.autoscaler.catch-up.duration

5min

The target duration for fully processing any backlog after a scaling operation. Set to 0 to disable backlog based scaling.

job.autoscaler.vertex.max-parallelism


Max vertex parallelism after scale up.

job.autoscaler.vertex.min-parallelism


Min vertex parallelism after scale down.
job.autoscaler.metrics.history.duration10min

job.autoscaler.metrics.history.max-size


Maximum number of metric reports collected for each job.
job.autoscaler.metrics.history.min-size
Minimum number of metric reports to collect before scaling logic is executed

Scaling Execution

Once the scaling algorithm has computed the updates, the JobGraph needs to be updated. The most straight-forward way would be to supply parallelism overrides as part of the Flink configuration as proposed in

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-29501
. The operator would just modify the deployment spec and the job would redeploy with the updated parallelisms. Eventually, we could add support for Flink's /rescale API which would allow us to in-place rescale the cluster without tearing it down. However, this has the downside of losing the immutability of the deployment.

...