Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The "Waiting for resources" state has the purpose to wait for the required resources. Since the cluster might not be able to provide all of the declared resources, the system needs to handle this situation as well. Hence, this state waits until either all required resources have arrived or until the set of available resources has stabilised. A set of resources has stabilised if the system expects that it won't change anymore. There are different ways to achieve this and one solution approach is to set an upper limit for the waiting time. This approach will also be implemented in the first version of the scheduler. Consequently, whenever the scheduler enters the "Waiting for resources" state, it registers a timeout after which it will try to go into the Executing state. If the job cannot be executed with the available resources, then the scheduler will fail it.

In the future we might take a look at Kafka's consumer protocol and how consumer changes are handled there.

...

Code Block
/** Assignment of slots to execution vertices. */
public final class ParallelismAndResourceAssignments {
    private final Map<ExecutionVertexID, ? extends LogicalSlot> assignedSlots;

    private final Map<JobVertexID, Integer> parallelismPerJobVertex;
}


  • calculateDesiredSlots returns a ResourceCounter that describes the ideal amount of resources for the job

...

  • .
  • determineParallelism accepts a JobInformation and a collection of free slots, and attempts to find the optimal parallelism for every operator given the available set of slots. If no such mapping could be found an empty Optional is returned to signal the Scheduler that this topology cannot be scheduled at this time. This method may be called by the scheduler irrespective of whether it has received the desired slots. If not enough slots have been provided the allocator may decide to downscale the job accordingly. On the flip-side, if more slots are available then necessary it may also upscale the job. The decision whether to scale up a job or not will be handled by ScalingPolicy.
  • assignResources assigns the available resources to the ExecutionVertices and reserves the slots according to the provided assignments parameter.

The first implementation of the SlotAllocator interface will support slot sharing w/o respecting previous allocations and input preferences.

FailureHandler

In order to handle failures, the declarative scheduler will support the same RestartBackoffTimeStrategy as used by the pipelined region scheduler. Hence all currently RestartBackoffTimeStrategies will be supported. The failure handling procedure is the following:

...