Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

PlantUML
@startuml

package "Declarative Scheduler" {
  [SlotAllocator]
  [ExecutionFailureHandler]
  [ScalingPolicy]
}

@enduml

SlotAllocator

The SlotAllocator (working title) is the component responsible for mapping a JobGraph and its contained vertices to slots.

This consists of 2 parts:

1) Calculating the resources required for scheduling a JobGraph / set of vertices

2) Calculating a mapping of vertices to be scheduled to free slots, and optionally rescaling vertices.

The interface will conceptually look like this:


Code Block
/** Component for calculating the slot requirements and mapping of vertices to slots. */
public interface SlotAllocator<T extends VertexAssignment> {

    /**
     * Calculates the total resources required for scheduling the given vertices.
     *
     * @param vertices vertices to schedule
     * @return required resources
     */
    ResourceCounter calculateRequiredSlots(Iterable<JobInformation.VertexInformation> vertices);

    /**
     * Determines the parallelism at which the vertices could given the collection of slots.
     *
     * @param jobInformation information about the job graph
     * @param slots Slots to consider for determining the parallelism
     * @return parallelism of each vertex along with implementation specific information, if the job
     *     could be run with the given slots
     */
    Optional<T> determineParallelism(
            JobInformation jobInformation, Collection<? extends SlotInfo> slots);

    /**
     * Assigns vertices to the given slots.
     *
     * @param jobInformation information about the job graph
     * @param freeSlots currently free slots
     * @param assignment information on how slots should be assigned to the slots
     * @return parallelism of each vertex and mapping slots to vertices
     */
    ParallelismAndResourceAssignments assignResources(
            JobInformation jobInformation,
            Collection<SlotInfoWithUtilization> freeSlots,
            T assignment);
}


Code Block
/** Base container for assignments of vertices to slots. */
public interface VertexAssignment {
    Map<JobVertexID, Integer> getMaxParallelismForVertices();
}


Code Block
/** Information about the job. */
public interface JobInformation {
    Collection<SlotSharingGroup> getSlotSharingGroups();

    VertexInformation getVertexInformation(JobVertexID jobVertexId);

    /** Information about a single vertex. */
    interface VertexInformation {
        JobVertexID getJobVertexID();

        int getParallelism();

        SlotSharingGroup getSlotSharingGroup();
    }
}


Code Block
/** Assignment of slots to execution vertices. */
public final class ParallelismAndResourceAssignments {
    private final Map<ExecutionVertexID, ? extends LogicalSlot> assignedSlots;

    private final Map<JobVertexID, Integer> parallelismPerJobVertex;
}

calculateDesiredSlots returns a ResourceCounter that describes the ideal amount of resources for the job.

A simple implementation would iterate over all slotsharing groups and add up the max vertex parallelism in each group. A bare-bones implementation would ignore slotsharing groups altogether and just add up the parallelism of each vertex.

determineParallelism accepts a JobInformation and a collection of free slots, and attempts to find the optimal parallelism for every operator given the available set of slots.

If no such mapping could be found an empty Optional is returned to signal the Scheduler that this topology cannot be scheduled at this time.

This method may be called by the scheduler irrespective of whether it has received the desired slots.

If not enough slots have been provided the allocator may decide to downscale the job accordingly. On the flip-side, if more slots are available then necessary it may also upscale the job.

The exact scaling behavior will be handled by ScalingPolicy. These may impose restrictions, e.g., that the parallelism must not fall below X / rise above Y, scaling must be done proportionally for all vertices in the topology, parallelism may only be doubled/halved, scaling must only be done after N time, etc.

assignResources assigns the available resources to the ExecutionVertices and reserves the slots.

ExecutionFailureHandler

ScalingPolicy

...