Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Since we have a couple of asynchronous operations (resource timeout in "Waiting for resources" state, restart delay in Restarting) which only work if there hasn’t happened another state change, we need to introduce a state version which can be used to filter out outdated operations.

Stable set of resources

The "Waiting for resources" state has the purpose to wait for the required resources. Since the cluster might not be able to provide all of the declared resources, the system needs to handle this situation as well. Hence, this state waits until either all required resources have arrived or until the set of available resources has stabilised. A set of resources has stabilised if the system expects that it won't change anymore. There are different ways to achieve this and one solution approach is to set an upper limit for the waiting time. This approach will also be implemented in the first version of the scheduler. Consequently, whenever the scheduler enters the "Waiting for resources" state, it registers a timeout after which it will try to go into the Executing state.

In the future we might take a look at Kafka's consumer protocol and how consumer changes are handled there.

Automatic scaling

In order to support automatic scaling, we ask a ScalingPolicy whenever new slots arrive and the scheduler is in state Executing whether the job can be scaled up. If this is the case, then it transitions into the Restarting state which triggers a global failover and a restart which will make use of the available resources.

...

Code Block
/** Assignment of slots to execution vertices. */
public final class ParallelismAndResourceAssignments {
    private final Map<ExecutionVertexID, ? extends LogicalSlot> assignedSlots;

    private final Map<JobVertexID, Integer> parallelismPerJobVertex;
}


calculateDesiredSlots calculateDesiredSlots returns a ResourceCounter that describes the ideal amount of resources for the job.A simple implementation would  The first implementation which also supports slot sharing will iterate over all slotsharing groups and add sum up the max vertex parallelism in each group. A bare-bones implementation would ignore slotsharing groups altogether and just add up the parallelism of each vertex.

determineParallelism accepts a JobInformation and a collection of free slots, and attempts to find the optimal parallelism for every operator given the available set of slots. If no such mapping could be found an empty Optional is returned to signal the Scheduler that this topology cannot be scheduled at this time. This method may be called by the scheduler irrespective of whether it has received the desired slots. If not enough slots have been provided the allocator may decide to downscale the job accordingly. On the flip-side, if more slots are available then necessary it may also upscale the job.The exact scaling behavior  The decision whether to scale up a job or not will be handled by ScalingPolicy. These may impose restrictions, e.g., that the parallelism must not fall below X / rise above Y, scaling must be done proportionally for all vertices in the topology, parallelism may only be doubled/halved, scaling must only be done after N time, etc.

assignResources assigns the available resources to the ExecutionVertices and reserves the slots.

...