Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-21075

Release

...

1.13


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Given the description above we propose the following state machine to model the behaviour of the adaptive scheduler:



PlantUML
@startuml
hide empty description

[*] -> Created
Created --> Waiting : Start scheduling
state "Waiting for resources" as Waiting
Waiting --> Waiting : Resources are not stable yet
Waiting --> Executing : Resources are stable
Waiting --> Finished : Cancel, suspend or not enough \nresources for executing
Executing --> Canceling : Cancel
Executing --> Failing : Unrecoverable fault
Executing --> Finished : Suspend or job reached terminal state
Executing --> Restarting : Recoverable fault
Restarting --> Finished : Suspend
Restarting --> Canceling : Cancel
Restarting --> Waiting : Cancelation complete
Canceling --> Finished : Cancelation complete
Failing --> Finished : Failing complete
Finished -> [*]

@enduml

The states have the following semantics:

  • Created: Initial state of the scheduler
  • Waiting for resources: The required resources are declared. The scheduler waits until either the requirements are fulfilled or the set of resources has stabilised.
  • Executing: The set of resources is stable and the scheduler could decide on the parallelism with which to execute the job. The ExecutionGraph is created and the execution of the job has started.
  • Restarting: A recoverable fault has occurred. The scheduler stops the ExecutionGraph by canceling it.
  • Canceling: The job has been canceled by the user. The scheduler stops the ExecutionGraph by canceling it.
  • Failing: An unrecoverable fault has occurred. The scheduler stops the ExecutionGraph by canceling it.
  • Finished: The job execution has been completed.

In the states “Created” and “Waiting for resources” there does not exist an ExecutionGraph. Only after we have acquired enough resources to run the job, the ExecutionGraph can be instantiated. Hence, all operations which require the ExecutionGraph will be ignored until we are in a state where an ExecutionGraph exists.

Since we have a couple of asynchronous operations (resource timeout in "Waiting for resources" state, restart delay in Restarting) which only work if no other state change has happened, we need to introduce a state version which can be used to filter out outdated operations.

Stable set of resources

The "Waiting for resources" state has the purpose to wait for the required resources. Since the cluster might not be able to provide all of the declared resources, the system needs to handle this situation as well. Hence, this state waits until either all required resources have arrived or until the set of available resources has stabilised. A set of resources has stabilised if the system expects that it won't change anymore. One possible solution approach sets an upper limit for the waiting time. This is also the approach we want to implement in the first version of the scheduler. Consequently, whenever the scheduler enters the "Waiting for resources" state, it registers a timeout after which it will try to go into the Executing state. If the job cannot be executed with the available resources, then the scheduler will fail it.

In the future we might take a look at Kafka's consumer protocol and how consumer changes are handled there and how to decide on a stable set of consumers/resources.

Automatic scaling

In order to support automatic scaling, we ask a ScaleUpController whenever new slots arrive and the scheduler is in state Executing whether the job can be scaled up. If this is the case, then the scheduler transitions into the Restarting state which triggers a global failover and a restart which will make use of the available resources. It is important to note that scale down actions will be triggered by failures of tasks whose slots have been removed.

Components of the scheduler

The scheduler consists of the following services to accomplish its job. These services are used by the different states to decide on state transitions and to perform certain operations

PlantUML
@startuml

package "Adaptive Scheduler" {
  [SlotAllocator]
  [FailureHandler]
  [ScaleUpController]
}

@enduml

SlotAllocator

The SlotAllocator is the component responsible for determining the resource requirements and mapping a JobGraph and its contained vertices to slots.

This consists of 2 parts:

  1. Calculating the resources required for scheduling a JobGraph / set of vertices
  2. Calculating a mapping of vertices to be scheduled to free slots, and optionally rescaling vertices.

The interface will look like this:



The states have the following semantics:

  • Created: Initial state of the scheduler
  • Waiting for resources: The required resources are declared. The scheduler waits until either the requirements are fulfilled or the set of resources has stabilised.
  • Executing: The set of resources is stable and the scheduler could decide on the parallelism with which to execute the job. The ExecutionGraph is created and the execution of the job has started.
  • Restarting: A recoverable fault has occurred. The scheduler stops the ExecutionGraph by canceling it.
  • Canceling: The job has been canceled by the user. The scheduler stops the ExecutionGraph by canceling it.
  • Failing: An unrecoverable fault has occurred. The scheduler stops the ExecutionGraph by canceling it.
  • Finished: The job execution has been completed.

In the states “Created” and “Waiting for resources” there does not exist an ExecutionGraph. Only after we have acquired enough resources to run the job, the ExecutionGraph can be instantiated. Hence, all operations which require the ExecutionGraph will be ignored until we are in a state where an ExecutionGraph exists.

Since we have a couple of asynchronous operations (resource timeout in "Waiting for resources" state, restart delay in Restarting) which only work if no other state change has happened, we need to introduce a state version which can be used to filter out outdated operations.

Stable set of resources

The "Waiting for resources" state has the purpose to wait for the required resources. Since the cluster might not be able to provide all of the declared resources, the system needs to handle this situation as well. Hence, this state waits until either all required resources have arrived or until the set of available resources has stabilised. A set of resources has stabilised if the system expects that it won't change anymore. One possible solution approach sets an upper limit for the waiting time. This is also the approach we want to implement in the first version of the scheduler. Consequently, whenever the scheduler enters the "Waiting for resources" state, it registers a timeout after which it will try to go into the Executing state. If the job cannot be executed with the available resources, then the scheduler will fail it.

In the future we might take a look at Kafka's consumer protocol and how consumer changes are handled there and how to decide on a stable set of consumers/resources.

Automatic scaling

In order to support automatic scaling, we ask a ScaleUpController whenever new slots arrive and the scheduler is in state Executing whether the job can be scaled up. If this is the case, then the scheduler transitions into the Restarting state which triggers a global failover and a restart which will make use of the available resources. It is important to note that scale down actions will be triggered by failures of tasks whose slots have been removed.

Components of the scheduler

The scheduler consists of the following services to accomplish its job. These services are used by the different states to decide on state transitions and to perform certain operations



PlantUML


SlotAllocator

The SlotAllocator is the component responsible for determining the resource requirements and mapping a JobGraph and its contained vertices to slots.

This consists of 2 parts:

  1. Calculating the resources required for scheduling a JobGraph / set of vertices
  2. Calculating a mapping of vertices to be scheduled to free slots, and optionally rescaling vertices.

The interface will look like this:


Code Block
/** Component for calculating the slot requirements and mapping of vertices to slots. */
public interface SlotAllocator<T extends VertexAssignment> {

    /**
     * Calculates the total resources required for scheduling the given vertices.
     *
     * @param vertices vertices to schedule
     * @return required resources
     */
    ResourceCounter calculateRequiredSlots(Iterable<JobInformation.VertexInformation> vertices);

    /**
     * Determines the parallelism at which the vertices could run given the collection of slots.
     *
     * @param jobInformation information about the job graph
     * @param slots Slots to consider for determining the parallelism
     * @return parallelism of each vertex along with implementation specific information, if the job
     *     could be run with the given slots
     */
    Optional<T> determineParallelism(
            JobInformation jobInformation, Collection<? extends SlotInfo> slots);

    /**
     * Assigns vertices to the given slots.
     *
     * @param jobInformation information about the job graph
     * @param freeSlots currently free slots
     * @param assignment information on how slots should be assigned to the slots
     * @return parallelism of each vertex and mapping slots to vertices
Code Block
/** Component for calculating the slot requirements and mapping of vertices to slots. */
public interface SlotAllocator<T extends VertexAssignment> {

    /**
     * Calculates the total resources required for scheduling the given vertices.
     *
     * @param vertices vertices to schedule
     * @return required resources
     */
    ResourceCounterParallelismAndResourceAssignments calculateRequiredSlots(Iterable<JobInformation.VertexInformation> vertices);
assignResources(
    /**
     * Determines the parallelismJobInformation atjobInformation,
 which the vertices could run given the collection of slots.
  Collection<SlotInfoWithUtilization> freeSlots,
  *
     * @param jobInformation information about the job graph
     * @param slots Slots to consider for determining the parallelismT assignment);
}


Code Block
/** Base container for assignments of vertices to slots. */
public interface VertexAssignment {
    Map<JobVertexID, * @return parallelism of each vertex along with implementation specific information, if the jobInteger> getMaxParallelismForVertices();
}


Code Block
/** Information about the job. */
public interface JobInformation {
    Collection<SlotSharingGroup> getSlotSharingGroups();

    VertexInformation * getVertexInformation(JobVertexID jobVertexId);

    could/** beInformation runabout witha thesingle givenvertex. slots*/
    interface VertexInformation */
{
        Optional<T>JobVertexID determineParallelismgetJobVertexID();

        int getParallelism();

    JobInformation jobInformation, Collection<? extends SlotInfo>SlotSharingGroup slotsgetSlotSharingGroup();

    }
}


Code Block
/**
 Assignment of slots to * Assigns vertices to the given slots.
     *
     * @param jobInformation information about the job graphexecution vertices. */
public final class ParallelismAndResourceAssignments {
    private *final @paramMap<ExecutionVertexID, freeSlots? currentlyextends freeLogicalSlot> slotsassignedSlots;

    private *final @paramMap<JobVertexID, assignment information on how slots should be assigned to the slots
     * @return parallelism of each vertex and mapping slots to vertices
     */
    ParallelismAndResourceAssignments assignResources(
            JobInformation jobInformation,
            Collection<SlotInfoWithUtilization> freeSlots,
            T assignment);
}
Code Block
/** Base container for assignments of vertices to slots. */
public interface VertexAssignment {
    Map<JobVertexID, Integer> getMaxParallelismForVertices();
}
Code Block
/** Information about the job. */
public interface JobInformation {
    Collection<SlotSharingGroup> getSlotSharingGroups();

    VertexInformation getVertexInformation(JobVertexID jobVertexId);

    /** Information about a single vertex. */
    interface VertexInformation {
        JobVertexID getJobVertexID();

        int getParallelism();

        SlotSharingGroup getSlotSharingGroup();
    }
}
Code Block
/** Assignment of slots to execution vertices. */
public final class ParallelismAndResourceAssignments {
    private final Map<ExecutionVertexID, ? extends LogicalSlot> assignedSlots;

    private final Map<JobVertexID, Integer> parallelismPerJobVertex;
}
  • calculateDesiredSlots returns a ResourceCounter that describes the ideal amount of resources for the job.
  • determineParallelism accepts a JobInformation and a collection of free slots, and attempts to find the optimal parallelism for every operator given the available set of slots. If no such mapping could be found an empty Optional is returned to signal the Scheduler that this topology cannot be scheduled at this time. This method may be called by the scheduler irrespective of whether it has received the desired slots.
  • assignResources assigns the available resources to the ExecutionVertices and reserves the slots according to the provided assignments parameter.

The first implementation of the SlotAllocator interface will support slot sharing w/o respecting previous allocations and input preferences. Moreover, it will distribute the available slots equally across the different slot sharing groups. The SlotAllocator implementation will respect the configured parallelism and never decide on a parallelism which exceeds the configured maxParallelism of an operator.

FailureHandler

In order to handle failures, the adaptive scheduler will support the same RestartBackoffTimeStrategy as used by the pipelined region scheduler. Hence all currently RestartBackoffTimeStrategies will be supported. The failure handling procedure is the following:

  1. Check whether the failure is recoverable. If not, then go to Failing state
  2. Ask the configured RestartBackoffTimeStrategy whether we can restart. If not, then go to Failing state
  3. Ask the configured RestartBackoffTimeStrategy for the backoff time between failure and restart
  4. Go into the Restarting state with the returned backoff time

ScaleUpController

Whenever the scheduler is in the Executing state and receives new slots, the scheduler checks whether the job can be run with an increased parallelism. If this is the case, then the scheduler will ask the ScaleUpController given the old and new cumulative parallelism of all operators whether it should scale up or not.

Code Block
/**
 * Simple controller for controlling the scale up behavior of the {@link AdaptiveScheduler}.
 */
public interface ScaleUpController {

    /**
     * This method gets called whenever new resources are available to the scheduler to scale up.
     *
     * @param currentCumulativeParallelism Cumulative parallelism of the currently running job
     *     graph.
     * @param newCumulativeParallelism Potential new cumulative parallelism with the additional
     *     resources.
     * @return true if the policy decided to scale up based on the provided information.
     */
    boolean canScaleUp(int currentCumulativeParallelism, int newCumulativeParallelism);
}

A basic default implementation will only scale up if newCumulativeParallelism - currentCumulativeParallelism >= increaseThreshold.

How to distinguish streaming jobs

Since we can not execute batch jobs with the adaptive scheduler, we need to be able to detect whether a job is a batch or a streaming job. For this purpose, we are introducing a new enum field in the JobGraph, called JobType. The default JobType of a JobGraph will be BATCH.

For batch jobs (from the DataSet API), setting this field is trivial (in the JobGraphGenerator).

For streaming jobs the situation is more complicated, since FLIP-134 introduced support for bounded (batch) jobs in the DataStream API. For the DataStream API, we rely on the result of StreamGraphGenerator#shouldExecuteInBatchMode, which checks if the DataStream program has unbounded sources.

Lastly, the Blink Table API / SQL Planner also generates StreamGraph instances, which contain batch jobs. We are tagging the StreamGraph as a batch job in the ExecutorUtils.setBatchProperties() method.

If we detect that the adaptive scheduler has been configured for a batch job, we will fall back to another scheduler supporting batch jobs (currently the pipelined region scheduler).

Configuration

We intend to extend/introduce the following new configuration values/parameters:

  • Extend jobmanager.scheduler to accept new value adaptive in order to activate the declarative scheduler
  • Introduce adaptive-scheduler.resource-timeout to configure the resource timeout for the "Waiting for resources" state

Compatibility, Deprecation, and Migration Plan

The adaptive scheduler will be a beta feature which the user has to activate explicitly by setting the config option jobmanager.scheduler: adaptive. This entails that Flink's default behaviour won't change.

If the adaptive scheduler is activated, then it will only be chosen if the user submitted a streaming job. If the user submitted a batch job, then Flink will fall back to the pipelined region scheduler.

Limitations & future improvements

The first version of the adaptive scheduler will come with a handful of limitations in order to reduce the scope of it.

Streaming jobs only

The adaptive scheduler runs with streaming jobs only. When submitting a batch job, then the default scheduler will be used.

No support for local recovery

In the first version of the scheduler we don't intend to support local recovery. Adding support for it should be possible and we intend to add support for it as a follow up.

No support for local failovers

Supporting local failovers is another feature which we want to add as a follow up. Adding support for it allows to not having to restart the whole job. One idea could be to extend the existing state machine by a new state "Restarting locally":

Integer> parallelismPerJobVertex;
}


  • calculateDesiredSlots returns a ResourceCounter that describes the ideal amount of resources for the job.
  • determineParallelism accepts a JobInformation and a collection of free slots, and attempts to find the optimal parallelism for every operator given the available set of slots. If no such mapping could be found an empty Optional is returned to signal the Scheduler that this topology cannot be scheduled at this time. This method may be called by the scheduler irrespective of whether it has received the desired slots.
  • assignResources assigns the available resources to the ExecutionVertices and reserves the slots according to the provided assignments parameter.

The first implementation of the SlotAllocator interface will support slot sharing w/o respecting previous allocations and input preferences. Moreover, it will distribute the available slots equally across the different slot sharing groups. The SlotAllocator implementation will respect the configured parallelism and never decide on a parallelism which exceeds the configured maxParallelism of an operator.

FailureHandler

In order to handle failures, the adaptive scheduler will support the same RestartBackoffTimeStrategy as used by the pipelined region scheduler. Hence all currently RestartBackoffTimeStrategies will be supported. The failure handling procedure is the following:

  1. Check whether the failure is recoverable. If not, then go to Failing state
  2. Ask the configured RestartBackoffTimeStrategy whether we can restart. If not, then go to Failing state
  3. Ask the configured RestartBackoffTimeStrategy for the backoff time between failure and restart
  4. Go into the Restarting state with the returned backoff time

ScaleUpController

Whenever the scheduler is in the Executing state and receives new slots, the scheduler checks whether the job can be run with an increased parallelism. If this is the case, then the scheduler will ask the ScaleUpController given the old and new cumulative parallelism of all operators whether it should scale up or not.

Code Block
/**
 * Simple controller for controlling the scale up behavior of the {@link AdaptiveScheduler}.
 */
public interface ScaleUpController {

    /**
     * This method gets called whenever new resources are available to the scheduler to scale up.
     *
     * @param currentCumulativeParallelism Cumulative parallelism of the currently running job
     *     graph.
     * @param newCumulativeParallelism Potential new cumulative parallelism with the additional
     *     resources.
     * @return true if the policy decided to scale up based on the provided information.
     */
    boolean canScaleUp(int currentCumulativeParallelism, int newCumulativeParallelism);
}

A basic default implementation will only scale up if newCumulativeParallelism - currentCumulativeParallelism >= increaseThreshold.

How to distinguish streaming jobs

Since we can not execute batch jobs with the adaptive scheduler, we need to be able to detect whether a job is a batch or a streaming job. For this purpose, we are introducing a new enum field in the JobGraph, called JobType. The default JobType of a JobGraph will be BATCH.

For batch jobs (from the DataSet API), setting this field is trivial (in the JobGraphGenerator).

For streaming jobs the situation is more complicated, since FLIP-134 introduced support for bounded (batch) jobs in the DataStream API. For the DataStream API, we rely on the result of StreamGraphGenerator#shouldExecuteInBatchMode, which checks if the DataStream program has unbounded sources.

Lastly, the Blink Table API / SQL Planner also generates StreamGraph instances, which contain batch jobs. We are tagging the StreamGraph as a batch job in the ExecutorUtils.setBatchProperties() method.

If we detect that the adaptive scheduler has been configured for a batch job, we will fall back to another scheduler supporting batch jobs (currently the pipelined region scheduler).

Configuration

We intend to extend/introduce the following new configuration values/parameters:

  • Extend jobmanager.scheduler to accept new value adaptive in order to activate the declarative scheduler
  • Introduce adaptive-scheduler.resource-timeout to configure the resource timeout for the "Waiting for resources" state

Compatibility, Deprecation, and Migration Plan

The adaptive scheduler will be a beta feature which the user has to activate explicitly by setting the config option jobmanager.scheduler: adaptive. This entails that Flink's default behaviour won't change.

If the adaptive scheduler is activated, then it will only be chosen if the user submitted a streaming job. If the user submitted a batch job, then Flink will fall back to the pipelined region scheduler.

Limitations & future improvements

The first version of the adaptive scheduler will come with a handful of limitations in order to reduce the scope of it.

Streaming jobs only

The adaptive scheduler runs with streaming jobs only. When submitting a batch job, then the default scheduler will be used.

No support for local recovery

In the first version of the scheduler we don't intend to support local recovery. Adding support for it should be possible and we intend to add support for it as a follow up.

No support for local failovers

Supporting local failovers is another feature which we want to add as a follow up. Adding support for it allows to not having to restart the whole job. One idea could be to extend the existing state machine by a new state "Restarting locally":



@startuml hide empty description [*] -> Created Created --> Waiting : Start scheduling state "Waiting for resources" as Waiting state "Restarting globally" as RestartingG state "Restarting locally" as RestartingL Waiting --> Waiting : Resources are not stable yet Waiting --> Executing : Resources are stable Waiting --> Finished : Cancel, suspend or \nnot enough resources for executing Executing --> Canceling : Cancel Executing --> Failing : Unrecoverable fault Executing --> Finished : Suspend or job reached terminal state Executing --> RestartingG : Recoverable global fault Executing --> RestartingL : Recoverable local fault RestartingL --> Executing : Recovered locally RestartingL --> RestartingL : Recoverable local fault RestartingL --> RestartingG : Local recovery timeout RestartingL --> Canceling : Cancel RestartingL --> Finished : Suspend RestartingL --> Failing : Unrecoverable fault RestartingG --> Finished : Suspend RestartingG --> Canceling : Cancel RestartingG --> Waiting : Cancelation complete Canceling --> Finished : Cancelation complete Failing --> Finished : Failing complete Finished -> [*] @enduml
PlantUML
PlantUML



No integration with Flink's web UI

...