Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

PlantUML
@startuml

package "Declarative Scheduler" {
  [SlotAllocator]
  [FailureHandler]
  [ScalingControllerScaleUpController]
}

@enduml

SlotAllocator

The SlotAllocator is the component responsible for determining the resource requirements and mapping a JobGraph and its contained vertices to slots.

...

  • calculateDesiredSlots returns a ResourceCounter that describes the ideal amount of resources for the job.
  • determineParallelism accepts a JobInformation and a collection of free slots, and attempts to find the optimal parallelism for every operator given the available set of slots. If no such mapping could be found an empty Optional is returned to signal the Scheduler that this topology cannot be scheduled at this time. This method may be called by the scheduler irrespective of whether it has received the desired slots. If not enough slots have been provided the allocator may decide to downscale the job accordingly. On the flip-side, if more slots are available then necessary it may also upscale the job. The decision whether to scale up a job or not will be handled by ScalingPolicyScaleUpController.
  • assignResources assigns the available resources to the ExecutionVertices and reserves the slots according to the provided assignments parameter.

...

Whenever the scheduler is in the Executing state and receives new slots, the scheduler checks whether the job can be run with an increased parallelism. If this is the case, then the scheduler will ask the ScalingController ScaleUpController given the old and cumulative parallelism of all operators whether it should scale up or not.

Code Block
/**
 * Simple controler for controlling the scale up behavior of the {@link
 * org.apache.flink.runtime.scheduler.declarative.DeclarativeScheduler}.
 */
public interface ScalingControlerScaleUpController {

    /**
     * This method gets called whenever new resources are available to the scheduler to scale up.
     *
     * @param currentCumulativeParallelism Cumulative parallelism of the currently running job
     *     graph.
     * @param newCumulativeParallelism Potential new cumulative parallelism with the additional
     *     resources.
     * @return true if the policy decided to scale up based on the provided information.
     */
    boolean canScaleUp(int currentCumulativeParallelism, int newCumulativeParallelism);
}

...