You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 14 Next »

Status

Current state"Under Discussion"

Discussion thread:

JIRA:

Released:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In order to support the reactive mode (FLIP-159) we need a different type of scheduler which first announces the required resources and only after having received the resources decides on the actual parallelism with which to execute the job. This has the benefit that this scheduler can schedule jobs if not all required resources are fulfilled. Moreover, it allows to continue executing jobs even after TaskManagers have been lost. The declarative scheduler builds upon the declarative resource management (FLIP-138).

Proposed Changes

The declarative scheduler will first work for streaming jobs only. This will simplify things considerably because we always have to schedule all operators. Moreover, by treating every failure as a global failover which restarts the whole topology, we can further simplify the scheduler. This failover behaviour is the default for many streaming topologies anyways if they don't consist of disjunct graphs. Given these assumptions we want to develop the following scheduler:

The scheduler takes the JobGraph for which it will first calculate the desired resources. After declaring these resources, the scheduler will wait until the available resources have stabilised. Once the resources are stabilised the scheduler should be able to decide on the actual parallelism of the job. Once the parallelism is decided and the executions are matched with the available slots, the scheduler deploys the executions.

Whenever a fault occurs, we will fail the whole job and try to restart it. Restarting works by cancelling all deployed tasks and then restarting the scheduling of the JobGraph following the same code paths as the initial scheduling operation.

An obvious regression of this implementation over the existing pipelined region scheduler is that we are always restarting the whole topology. For embarrassingly parallel jobs this might not be necessary since the running tasks don’t need to be reset to the latest checkpoint. Supporting partial failover would be the first extension of the proposed scheduler. One way to support partial failovers is to introduce a distinction between global and local failovers.

  • Global failover: Restart of the whole topology which allows to change the parallelism of the job
  • Local failoverRestart of a subset of the executions which does not change the parallelism of the operator

If the system cannot recover from a local failover because it does not have enough slots available, it must be escalated which makes it a global failover. A global failover will allow the system to rescale the whole job.

State machine of the scheduler

Given the description above we propose the following state machine to model the behaviour of the declarative scheduler: Created Waiting for resources Executing Start scheduling Resources are not stable yet Resources are stable

Components of the scheduler

SlotAllocator

ExecutionFailureHandler

RescalingStrategy

How to distinguish streaming jobs

Compatibility, Deprecation, and Migration Plan

The declarative scheduler will be a beta feature which the user has to activate explicitly by setting the config option jobmanager.scheduler: declarative. It will only be chosen if the user submitted a streaming job.

Limitations

  • The declarative scheduler runs with streaming jobs only
  • No support for local recovery
  • No support for local failovers
  • No integration with Flink's web UI

Test Plan

The new scheduler needs extensive unit, IT and end-to-end testing because it is a crucial component which is at the heart of Flink.

Rejected Alternatives

  • No labels