Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Since we have a couple of asynchronous operations (resource timeout in "Waiting for resources" state, restart delay in Restarting) which only work if there hasn’t happened another no other state change has happened, we need to introduce a state version which can be used to filter out outdated operations.

...

The "Waiting for resources" state has the purpose to wait for the required resources. Since the cluster might not be able to provide all of the declared resources, the system needs to handle this situation as well. Hence, this state waits until either all required resources have arrived or until the set of available resources has stabilised. A set of resources has stabilised if the system expects that it won't change anymore. There are different ways to achieve this and one One possible solution approach is to set sets an upper limit for the waiting time. This approach will also be implemented is also the approach we want to implement in the first version of the scheduler. Consequently, whenever the scheduler enters the "Waiting for resources" state, it registers a timeout after which it will try to go into the Executing state. If the job cannot be executed with the available resources, then the scheduler will fail it.

In the future we might take a look at Kafka's consumer protocol and how consumer changes are handled there and how to decide on a stable set of consumers/resources.

Automatic scaling

In order to support automatic scaling, we ask a ScalingPolicy ScaleUpController whenever new slots arrive and the scheduler is in state Executing whether the job can be scaled up. If this is the case, then it the scheduler transitions into the Restarting state which triggers a global failover and a restart which will make use of the available resources. It is important to note that scale down actions will be triggered by failures of tasks whose slots have been removed.

Components of the scheduler

...

  1. Check whether the failure is recoverable. If not, then go to Failing state
  2. Ask the configured RestartBackoffTimeStrategy whether we can restart. If not, then go to Failing state
  3. Ask the configured RestartBackoffTimeStrategy for the backoff time between failure and restart
  4. Go into the Restarting state with the returned backoff time

ScaleUpController

Whenever the scheduler is in the Executing state and receives new slots, the scheduler checks whether the job can be run with an increased parallelism. If this is the case, then the scheduler will ask the ScaleUpController given the old and new cumulative parallelism of all operators whether it should scale up or not.

...

Since we can not execute batch jobs with the declarative scheduler, we need to be able to detect whether a job is a batch or a streaming job. For this purpose, we are introducing a new enum field in the JobGraph, called JobType. The default JobType of a JobGraph will be BATCH.

For batch jobs (from the DataSet API), setting this field is trivial (in the JobGraphGenerator).

...

In the first version of the scheduler we don't intend to support local recovery. Adding support for it should be possible and we intend to add support for it possible and we intend to add support for it as a follow up.

No support for local failovers

Supporting local failovers is another feature which we want to add as a follow up. One Adding support for it allows to not having to restart the whole job. One idea could be to extend the existing state machine by a new state "Restarting locally":

...

PlantUML
@startuml
hide empty description

[*] -> Created
Created --> Waiting : Start scheduling
state "Waiting for resources" as Waiting
state "Restarting globally" as RestartingG
state "Restarting locally" as RestartingL
Waiting --> Waiting : Resources are not stable yet
Waiting --> Executing : Resources are stable
Waiting --> Finished : Cancel, suspend or \nnot enough resources for executing
Executing --> Canceling : Cancel
Executing --> Failing : Unrecoverable fault
Executing --> Finished : Suspend or job reached terminal state
Executing --> RestartingG : Recoverable global fault
Executing --> RestartingL : Recoverable local fault
RestartingL --> Executing : Recovered locally
RestartingL --> RestartingL : Recoverable local fault
RestartingL --> RestartingG : Local recovery timeout
RestartingL --> Canceling : Cancel
RestartingL --> Finished : Suspend
RestartingL --> Failing : Unrecoverable fault
RestartingG --> Finished : Suspend
RestartingG --> Canceling : Cancel
RestartingG --> Waiting : Cancelation complete
Canceling --> Finished : Cancelation complete
Failing --> Finished : Failing complete
Finished -> [*]

@enduml

No support for local failovers

...


No integration with Flink's web UI

...