You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 28 Next »

Status

Current state"Under Discussion"

Discussion thread:

JIRA:

Released:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In order to support the reactive mode (FLIP-159) we need a different type of scheduler which first announces the required resources and only after having received the resources decides on the actual parallelism with which to execute the job. This has the benefit that this scheduler can schedule jobs if not all required resources are fulfilled. Moreover, it allows to continue executing jobs even after TaskManagers have been lost. The declarative scheduler builds upon the declarative resource management (FLIP-138).

Proposed Changes

The declarative scheduler will first work for streaming jobs only. This will simplify things considerably because we always have to schedule all operators. Moreover, by treating every failure as a global failover which restarts the whole topology, we can further simplify the scheduler. This failover behaviour is the default for many streaming topologies anyways if they don't consist of disjunct graphs. Given these assumptions we want to develop the following scheduler:

The scheduler takes the JobGraph for which it will first calculate the desired resources. After declaring these resources, the scheduler will wait until the available resources have stabilised. Once the resources are stabilised the scheduler should be able to decide on the actual parallelism of the job. Once the parallelism is decided and the executions are matched with the available slots, the scheduler deploys the executions.

Whenever a fault occurs, we will fail the whole job and try to restart it. Restarting works by cancelling all deployed tasks and then restarting the scheduling of the JobGraph following the same code paths as the initial scheduling operation.

An obvious regression of this implementation over the existing pipelined region scheduler is that we are always restarting the whole topology. For embarrassingly parallel jobs this might not be necessary since the running tasks don’t need to be reset to the latest checkpoint. Supporting partial failover would be the first extension of the proposed scheduler. One way to support partial failovers is to introduce a distinction between global and local failovers.

  • Global failover: Restart of the whole topology which allows to change the parallelism of the job
  • Local failoverRestart of a subset of the executions which does not change the parallelism of the operator

If the system cannot recover from a local failover because it does not have enough slots available, it must be escalated which makes it a global failover. A global failover will allow the system to rescale the whole job.

State machine of the scheduler

Given the description above we propose the following state machine to model the behaviour of the declarative scheduler:


Created Waiting for resources Executing Finished Canceling Failing Restarting Start scheduling Resources are not stable yet Resources are stable Cancel or suspend Cancel Unrecoverable fault Suspend or job reached terminal state Recoverable fault Suspend Cancel Cancelation complete Cancelation complete Failing complete

The states have the following semantics:

  • Created: Initial state of the scheduler
  • Waiting for resources: The required resources are declared. The scheduler waits until either the requirements are fulfilled or the set of resources has stabilised.
  • Executing: The set of resources is stable and the scheduler could decide on the parallelism with which to execute the job. The ExecutionGraph is created and the execution of the job has started.
  • Restarting: A recoverable fault has occurred. The scheduler stops the ExecutionGraph by canceling it.
  • Canceling: The job has been canceled by the user. The scheduler stops the ExecutionGraph by canceling it.
  • Failing: An unrecoverable fault has occurred. The scheduler stops the ExecutionGraph by canceling it.
  • Finished: The job execution has been completed.

In the states “Created” and “Waiting for resources” there does not exist an ExecutionGraph. Only after we have acquired enough resources to run the job, the ExecutionGraph can be instantiated. Hence, all operations which require the ExecutionGraph will be ignored until we are in a state where an ExecutionGraph exists.

Since we have a couple of asynchronous operations (resource timeout in "Waiting for resources" state, restart delay in Restarting) which only work if there hasn’t happened another state change, we need to introduce a state version which can be used to filter out outdated operations.

Stable set of resources

The "Waiting for resources" state has the purpose to wait for the required resources. Since the cluster might not be able to provide all of the declared resources, the system needs to handle this situation as well. Hence, this state waits until either all required resources have arrived or until the set of available resources has stabilised. A set of resources has stabilised if the system expects that it won't change anymore. There are different ways to achieve this and one solution approach is to set an upper limit for the waiting time. This approach will also be implemented in the first version of the scheduler. Consequently, whenever the scheduler enters the "Waiting for resources" state, it registers a timeout after which it will try to go into the Executing state. If the job cannot be executed with the available resources, then the scheduler will fail it.

In the future we might take a look at Kafka's consumer protocol and how consumer changes are handled there.

Automatic scaling

In order to support automatic scaling, we ask a ScalingPolicy whenever new slots arrive and the scheduler is in state Executing whether the job can be scaled up. If this is the case, then it transitions into the Restarting state which triggers a global failover and a restart which will make use of the available resources.

Components of the scheduler

The scheduler consists of the following services to accomplish its job. These services are used by the different states to decide on state transitions and to perform certain operations


Declarative Scheduler SlotAllocator FailureHandler ScalingPolicy

SlotAllocator

The SlotAllocator is the component responsible for determining the resource requirements and mapping a JobGraph and its contained vertices to slots.

This consists of 2 parts:

1) Calculating the resources required for scheduling a JobGraph / set of vertices

2) Calculating a mapping of vertices to be scheduled to free slots, and optionally rescaling vertices.

The interface will look like this:

/** Component for calculating the slot requirements and mapping of vertices to slots. */
public interface SlotAllocator<T extends VertexAssignment> {

    /**
     * Calculates the total resources required for scheduling the given vertices.
     *
     * @param vertices vertices to schedule
     * @return required resources
     */
    ResourceCounter calculateRequiredSlots(Iterable<JobInformation.VertexInformation> vertices);

    /**
     * Determines the parallelism at which the vertices could run given the collection of slots.
     *
     * @param jobInformation information about the job graph
     * @param slots Slots to consider for determining the parallelism
     * @return parallelism of each vertex along with implementation specific information, if the job
     *     could be run with the given slots
     */
    Optional<T> determineParallelism(
            JobInformation jobInformation, Collection<? extends SlotInfo> slots);

    /**
     * Assigns vertices to the given slots.
     *
     * @param jobInformation information about the job graph
     * @param freeSlots currently free slots
     * @param assignment information on how slots should be assigned to the slots
     * @return parallelism of each vertex and mapping slots to vertices
     */
    ParallelismAndResourceAssignments assignResources(
            JobInformation jobInformation,
            Collection<SlotInfoWithUtilization> freeSlots,
            T assignment);
}
/** Base container for assignments of vertices to slots. */
public interface VertexAssignment {
    Map<JobVertexID, Integer> getMaxParallelismForVertices();
}
/** Information about the job. */
public interface JobInformation {
    Collection<SlotSharingGroup> getSlotSharingGroups();

    VertexInformation getVertexInformation(JobVertexID jobVertexId);

    /** Information about a single vertex. */
    interface VertexInformation {
        JobVertexID getJobVertexID();

        int getParallelism();

        SlotSharingGroup getSlotSharingGroup();
    }
}


/** Assignment of slots to execution vertices. */
public final class ParallelismAndResourceAssignments {
    private final Map<ExecutionVertexID, ? extends LogicalSlot> assignedSlots;

    private final Map<JobVertexID, Integer> parallelismPerJobVertex;
}


  • calculateDesiredSlots returns a ResourceCounter that describes the ideal amount of resources for the job.
  • determineParallelism accepts a JobInformation and a collection of free slots, and attempts to find the optimal parallelism for every operator given the available set of slots. If no such mapping could be found an empty Optional is returned to signal the Scheduler that this topology cannot be scheduled at this time. This method may be called by the scheduler irrespective of whether it has received the desired slots. If not enough slots have been provided the allocator may decide to downscale the job accordingly. On the flip-side, if more slots are available then necessary it may also upscale the job. The decision whether to scale up a job or not will be handled by ScalingPolicy.
  • assignResources assigns the available resources to the ExecutionVertices and reserves the slots according to the provided assignments parameter.

The first implementation of the SlotAllocator interface will support slot sharing w/o respecting previous allocations and input preferences.

FailureHandler

In order to handle failures, the declarative scheduler will support the same RestartBackoffTimeStrategy as used by the pipelined region scheduler. Hence all currently RestartBackoffTimeStrategies will be supported. The failure handling procedure is the following:

  1. Check whether the failure is recoverable. If not, then go to Failing state
  2. Ask the configured RestartBackoffTimeStrategy whether we can restart. If not, then go to Failing state
  3. Ask the configured RestartBackoffTimeStrategy for the backoff time between failure and restart

ScalingPolicy

Whenever the scheduler is in the Executing state and receives new slots, the scheduler checks whether the job can be run with an increased parallelism. If this is the case, then the scheduler will ask the ScalingPolicy given the old and cumulative parallelism of all operators whether it should scale up or not.

/**
 * Simple policy for controlling the scale up behavior of the {@link
 * org.apache.flink.runtime.scheduler.declarative.DeclarativeScheduler}.
 */
public interface ScalingPolicy {

    /**
     * This method gets called whenever new resources are available to the scheduler to scale up.
     *
     * @param currentCumulativeParallelism Cumulative parallelism of the currently running job
     *     graph.
     * @param newCumulativeParallelism Potential new cumulative parallelism with the additional
     *     resources.
     * @return true if the policy decided to scale up based on the provided information.
     */
    boolean canScaleUp(int currentCumulativeParallelism, int newCumulativeParallelism);
}

A basic default implementation will only scale up if newCumulativeParallelism - currentCumulativeParallelism >= increaseThreshold.

How to distinguish streaming jobs

Compatibility, Deprecation, and Migration Plan

The declarative scheduler will be a beta feature which the user has to activate explicitly by setting the config option jobmanager.scheduler: declarative. It will only be chosen if the user submitted a streaming job.

Limitations

  • Rescaling happens through restarting the job, thus jobs with large state might need a lot of resources and time to rescale. Rescaling a job causes downtime of your job, but no data loss.
  • The declarative scheduler runs with streaming jobs only
  • No support for local recovery
  • No support for local failovers
  • No integration with Flink's web UI
  • No support for fine grained resource specifications

Test Plan

The new scheduler needs extensive unit, IT and end-to-end testing because it is a crucial component which is at the heart of Flink.

Rejected Alternatives

  • No labels