Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Public Interfaces

...

In order to support the reactive mode (FLIP-159) we need a different type of scheduler which does not first decide on the parallelism with which to execute a job and then asking for the resources but doing it vice versa.

Proposed Changes

The declarative scheduler will first work for streaming jobs only. This will simplify things considerably because there is only a single job pipelined region. Moreover, we could treat task failures as global failures which restart the whole topology. This is true for many streaming topologies anyways. Given these assumptions we could develop the following scheduler:

The scheduler takes the JobGraph for which it will first calculate the desired resources. After declaring these resources, the scheduler will wait until the available resources have stabilized. Once the resources are stabilized the scheduler should be able to decide on the actual parallelism of the job. Once the parallelism is decided and the executions are matched with the available slots, the scheduler deploys the executions.

Whenever a fault occurs (soft or hard), we will fail the whole job and try to restart it. Restarting works by cancelling all deployed tasks and then restarting the scheduling of the JobGraph following the same code paths as the initial scheduling operation.

An obvious regression of this implementation over the existing pipelined region scheduler is that we are always restarting the whole topology. For embarrassingly parallel jobs this might not be necessary since the running tasks don’t need to be reset to the latest checkpoint. Supporting partial failover would be the first extension of the proposed scheduler. One way to support partial failovers is to introduce a distinction between global and local failovers.


  • Global failover: Restart of the whole topology which allows to change the parallelism of the Job
  • Local failover:



If the system cannot recover from a local failover because it does not have enough slots available, it must be escalated which makes it a global failover. A global failover will allow the system to rescale the whole job.

Ideally, we use the same mechanism to find a new slot for the local failover as we use for matching a job-pipelined region with a set of available slots and thereby deciding on the parallelism of the job-pipelined region.

Compatibility, Deprecation, and Migration Plan

...