Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties


Discussion threadhere (thread)
Vote threadhere (thread)
JIRAhere (FLINK-21883)
Release<Flink Version>


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In the context of reactive mode, we would like to introduce a cooldown period , during which no further scaling actions are performed , after a scaling action. Indeed, we would like to avoid too frequent scaling operations either in scaling up or in scaling down.

Public Interfaces

...

This FLIP adds 2 new user configurations:

  • jobmanager.adaptive-scheduler.scaling-interval.min allowing the user to configure the minimum time between 2 scaling operations
  • jobmanager.adaptive-scheduler.scaling-interval.max optional parameter allowing the user to configure the time after which a scaling operation is triggered regardless if the requirements (AdaptiveScheduler#shouldRescale) are met . I f not set, there will be no force-scaling

Proposed Changes

Important points are these onces: when a scaling event is received either scaling up or scaling down:

  • If it falls outside a cooldown period, it is executed right away and a timer is started
  • If it falls during the cooldown period, it is not dropped, it is rather queued
  • Receiving a scaling event during a cooldown period does not reset the period timer to avoid  increasing the delay in scaling operations.
  • When the period ends, all the queued scaling operations are aggregated to result into a single operation. This operation is executed and then a new scaling-cooldown-period is started

The diagram below shows the different steps and cases:

PlantUML
@startuml
!pragma teoz true 
skinparam ParticipantPadding 100
participant Scheduler as sch
participant ScalingOperationQueue as queue 
participant CooldownTimer as timer
[-> sch : scaling event
sch -> sch : trigger scale change

{start1} sch -> timer : start timer

activate timer
[-> sch : scaling event
sch -> queue : queue operation
[-> sch : scaling event
sch -> queue : queue operation
{end1} timer --> sch : end of cooldown period  
deactivate timer
{start1} <-> {end1} : scaling-cooldown-period 
sch -> queue : dequeue operations
sch -> sch : aggregate operations
sch -> sch : trigger scale change
{start2} sch -> timer : start timer
activate timer
||120||
{end2} timer --> sch : end of cooldown period
deactivate timer

{start2} <-> {end2} : scaling-cooldown-period 

sch -> sch : no operation to trigger
@enduml

This diagram is explained as this:

...

This FLIP proposes the following changes:

A. When new slots are available:

  • Flink should rescale immediately only if last rescale was done more than scaling-interval.min ago.
  • Otherwise it should schedule a rescale at (now + scaling-interval.min) point in time. It is equivalent to resetting the cooldown period when new slots arrive during a cooldown period. Indeed, we decided to lower the scaling-interval.min default value to be more reactive (cf compatibility part), resetting the period allows to protect against too frequent rescales. 
  • The rescale is done like this: 
    • if minimum scaling requirements are met (AdaptiveScheduler#shouldRescale), the job is restarted with new parallelism (as before)
    • if minimum scaling requirements are not met
      • if last rescale was done more than scaling-interval.max ago, a rescale is forced.
      • otherwise, schedule a forced rescale in scaling-interval.max

                     => When a rescale is forced, the rescale is done as long as the parallelism has changed. Otherwise, to avoid unnecessary restarts, the rescale is done when added resources are above the configured minimum. 

B. when slots are lost (most of the time after a  TaskManager failure), there will be no change compared to the current behavior:

    1. the job transitions to Restarting state (cf FLIP-160)
    2. then it transitions to Waiting for Resources state (cf FLIP-160) in which the job will not be rescaled before stable resources timeout. This will protect against subsequent scaling operations (slot losses due to more TaskManager failures or slot offerings) during this timeout period (configurable via existing jobmanager.adaptive-scheduler.resource-stabilization-timeout).


The cooldown period will be tied to the Executing state (cf FLIP-160). As a consequence, if the job or the JobManager fail,  the current state of the cooldown period is reset.  

...

Compatibility, Deprecation, and Migration Plan

Reactive mode and adaptive scheduler are already released but the current behavior has no cooldown period. So the current state behavior is equivalent to setting the jobmanager.adaptive-scheduler.scaling-cooldown-period new configuration parameter to 0s. That way, there will be interval.min to 0s with no jobmanager.adaptive-scheduler.scaling-interval.max set. Such default values will have no impact on the users.

But we could also consider that setting a default jobmanager.adaptive-scheduler.scaling-cooldown-period value to 300s would not interval.minto a value higher than 0 would not really break the user but rather give him a protection against too frequent scale changes.=> I'd tend to prefer setting a default scaling-cooldown-period = 300s when reactive mode is enabled

So this FLIP proposes setting defaults values to jobmanager.adaptive-scheduler.scaling-interval.min = 30s and no jobmanager.adaptive-scheduler.scaling-interval.max (force scaling disabled). Indeed, by default we prefer to favor lower scaling-interval.min (for more reactive rescaling) and let the users increase the value when they have high workloads.

Test Plan

The new cooldown period feature should be covered by end-to-end tests. The current set of related end-to-end tests cover only resuming a pipeline with various configuration combinations (file/rocks, sync/async, parallelism change/ no parallelism change ...). So we need to add some E2E test cases covering the use cases described in the sequence diagram above measuring the time between scaling operations in various situations. We should be able to use the same DataStreamAllroundTestProgram in the E2E tests.

Rejected Alternatives

comprehensive tests. They should test rescaling in various time conditions:  scaling-interval.min exceeded and not exceeded, scaling-interval.max enabled and disabled, scaling-interval.max exceeded and not exceeded. These tests car be added to existing ExecutingTest.

Rejected Alternatives

rejected the option of adding a queue for scaling requestsWhen scaling operations are dequeued, they are not executed one by once at a sclaing-cooldown-period pace to avoid adding too much delay in scaling.