Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Created: Initial state of the scheduler
  • Waiting for resources: The required resources are declared. The scheduler waits until either the requirements are fulfilled or the set of resources has stabilised.
  • Executing: The set of resources is stable and the scheduler could decide on the parallelism with which to execute the job. The ExecutionGraph is created and the execution of the job has started.
  • Restarting: A recoverable fault has occurred. The scheduler stops the ExecutionGraph by canceling it.
  • Canceling: The job has been canceled by the user. The scheduler stops the ExecutionGraph by canceling it.
  • Failing: An unrecoverable fault has occurred. The scheduler stops the ExecutionGraph by canceling it.
  • Finished: The job execution has been completed.

In the states “created” “Created” and “Waiting for resources” there does not exist an ExecutionGraph. Only after we have acquired enough resources to run the job, the EG ExecutionGraph can be instantiated. Hence, all operations which require the EG ExecutionGraph will be ignored until we are in a state where an EG ExecutionGraph exists.

Since we have a couple of asynchronous operations (resource timeout in "Waiting for resources" state, restart delay in restartingRestarting) which only work if there hasn’t happened another state change, we need to introduce a state version which can be used to filter out outdated operations.

...

In order to support automatic scaling, we ask whenever we enter the Executing state a ScalingPolicy whenever new slots arrive and the scheduler is in state Executing whether the system job can scale be scaled up. If this is the case, then it transitions into the Restarting  state  state which triggers a global failover and a restart which will make use of the available resources.

...

PlantUML
@startuml

package "Declarative Scheduler" {
  [SlotAllocator]
  [FailureHandler]
  [ScalingPolicy]
}

@enduml

SlotAllocator

The SlotAllocator (working title) is SlotAllocator is the component responsible for determining the resource requirements and mapping a JobGraph and its contained vertices to slots.

...

2) Calculating a mapping of vertices to be scheduled to free slots, and optionally rescaling vertices.

The interface willconceptually  look like this:


Code Block
/** Component for calculating the slot requirements and mapping of vertices to slots. */
public interface SlotAllocator<T extends VertexAssignment> {

    /**
     * Calculates the total resources required for scheduling the given vertices.
     *
     * @param vertices vertices to schedule
     * @return required resources
     */
    ResourceCounter calculateRequiredSlots(Iterable<JobInformation.VertexInformation> vertices);

    /**
     * Determines the parallelism at which the vertices could run given the collection of slots.
     *
     * @param jobInformation information about the job graph
     * @param slots Slots to consider for determining the parallelism
     * @return parallelism of each vertex along with implementation specific information, if the job
     *     could be run with the given slots
     */
    Optional<T> determineParallelism(
            JobInformation jobInformation, Collection<? extends SlotInfo> slots);

    /**
     * Assigns vertices to the given slots.
     *
     * @param jobInformation information about the job graph
     * @param freeSlots currently free slots
     * @param assignment information on how slots should be assigned to the slots
     * @return parallelism of each vertex and mapping slots to vertices
     */
    ParallelismAndResourceAssignments assignResources(
            JobInformation jobInformation,
            Collection<SlotInfoWithUtilization> freeSlots,
            T assignment);
}

...

determineParallelism accepts a JobInformation and a collection of free slots, and attempts to find the optimal parallelism for every operator given the available set of slots. If no such mapping could be found an empty Optional is returned to signal the Scheduler that this topology cannot be scheduled at this time. This method may be called by the scheduler irrespective of whether it has received the desired slots. If not enough slots have been provided the allocator may decide to downscale the job accordingly. On the flip-side, if more slots are available then necessary it may also upscale the job.

...