You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »


Discussion threadhere (thread)
Vote threadhere ()
JIRAhere (FLINK-21883)
Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In the context of reactive mode, we would like to introduce a cooldown period, during which no further scaling actions are performed, after a scaling action. Indeed, we would like to avoid too frequent scaling operations either in scaling up or in scaling down.

Public Interfaces

This FLIP adds 2 new user configurations:

  • jobmanager.adaptive-scheduler.scaling-interval.min allowing the user to configure the minimum time between 2 scaling operations
  • jobmanager.adaptive-scheduler.scaling-interval.max allowing the user to configure the time after which a scaling operation is triggered regardless if the requirements (AdaptiveScheduler#shouldRescale()) are met . I f not set, there will be no forcing of the scaling.

Proposed Changes

Important points are these onces:

  • when new slots are available, flink should rescale immediately only if last rescale was done more than scaling-interval.min ago otherwise it should schedule a rescale at last-rescale + scaling-interval.min time. 
  • when slots are lost (most of the time after a  TaskManager failure), there will be no change compared to the current behavior:
    1. the pipeline transitions to Restarting state (cf FLIP-160)
    2. then it transitions to Waiting for Resources state (cf FLIP-160) in which the pipeline will not be rescaled before stable resources timeout. This will protect against subsequent scaling operations (slot losses due to more TaskManager failures or slot offerings) during this timeout period (configurable jobmanager.adaptive-scheduler.resource-stabilization-timeout).


The cooldown period will be tied to the Executing state (cf FLIP-160). As a consequence, in case of JobManager failure,  the current state of the cooldown period is reset.  

Compatibility, Deprecation, and Migration Plan

Reactive mode and adaptive scheduler are already released but the current behavior has no cooldown period. So the current state is equivalent to setting the jobmanager.adaptive-scheduler.scaling-interval.min 0s with no jobmanager.adaptive-scheduler.scaling-interval.max. That way, there will be no impact on the users.

But we could also consider that setting a default jobmanager.adaptive-scheduler.scaling-interval.min value to 300s would not break the user but rather give him a protection against too frequent scale changes.

So this FLIP proposes setting defaults values to jobmanager.adaptive-scheduler.scaling-interval.min = 300s and no jobmanager.adaptive-scheduler.scaling-interval.max (force scaling disabled)

Test Plan

The new cooldown period feature should be covered by end-to-end tests. The current set of related end-to-end tests cover only resuming a pipeline with various configuration combinations (file/rocksDb, sync/async, parallelism change/ no parallelism change ...). So we need to add some E2E tests covering the use cases described above measuring the time between scaling operations in various situations. We should be able to use the same DataStreamAllroundTestProgram in the E2E tests.

Rejected Alternatives

rejected adding a queue for scaling requests.

  • No labels