Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

Discussion thread:_

JIRA: _

...

Page properties



Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

In order to better work under constrained resources (e.g. Flink cannot obtain all the requested resources), the JobManager JobMaster cannot expect that all of its slot requests are getting fulfilled. Currently, JobManagers JobMasters ask for every slot individually and fail the job if it cannot be obtained. Instead of deciding on the required set of slots before asking the ResourceManager, it would be more flexible if the JobManager JobMaster first declared what it needs to run the job. Based on what the JobManager JobMaster actually gets assigned by the ResourceManager, it can then decide to run the job with an adjusted parallelism. That way the JobManager JobMaster is able to react if it should get fewer slots than it declared.

...

Instead of asking for every slot individually by calling ResourceManagerGateway.requestSlot(JobMasterId, SlotRequest), the JobMaster should declare the collection of required slots via a new RPC ResourceManagerGateway.declareRequiredResources(JobMasterId jobMasterId, ResourceRequirements resourceRequirements).


Code Block
titleResourceRequirements
public class ResourceRequirements implements Serializable {

...


    private final JobID jobId;

...



    private final String targetAddress;

...



    private final Collection<ResourceRequirement> resourceRequirements;

...


}


Code Block
titleResourceRequirement
public class ResourceRequirement implements Serializable {

...


    private final ResourceProfile resourceProfile;

...


    private final int numberOfRequiredSlots;

...


}


The jobMasterId is used to filter out messages coming from an old Job leader.

...

A simple strategy to fulfill unfulfilled requirements is to try to satisfy the requirements in a first come first serve fashion. Jobs which register their requirements first, will have precedence over other jobs also if the requirements change during the runtime. This approach is straight-forward and prevents issues where resources are distributed in such a way that no job has enough.

For the time being (and in order to keep the protocol simpler), the ResourceManager won’t revoke resources/slots which are assigned to a different job. This means that the ResourceManager will only assign free resources in order to fulfill the resource requirements. In a future version, we might think about letting the ResourceManager balance resources across jobs. This would entail that the ResourceManager may ask the JobMaster to release slots.

...

Currently, the SlotManager supports failing unfulfillable slot requests by calling ResourceActions.notifyAllocationFailure. A slot is unfulfillable if the SlotManager has neither allocated slots nor can allocate a slot from the ResourceManager. This works because we have individual slot requests which are identified by the AllocationID. With the declarative resource management, we cannot fail individual slot requests. However, we can let the JobMaster know if we cannot fulfill the minimum resource requirement for a job after resourcemanager.standalone.start-up-time has passed. In order to send this notification we have to introduce a new rpc RPC JobMaster.notifyNotEnoughResources(AvailableResources availableResourcesCollection<ResourceRequirement> acquiredResources). AvailableResources contains acquiredResources is the set collection of available acquired resources at for the ResourceManagerjob.

This signal is sent whenever the SlotManager tried to fulfill the requirements for a job but failed to do so.

...

The SlotManager will no longer timeout slot requests. Instead it will try to acquire enough resources to fulfill the required resources until the resource requirements have been changed. Failing the job execution if the job cannot acquire enough resources after a defined period of time is then the responsibility of the JobMaster and the SlotPool.

Interface changes

In order to enable the SlotManager to process resource declarations, we need to extend the interface with an additional method:


Code Block
titleSlotManager interface extension
interface SlotManager {
	/**
	 * Process the given resource requirements. The resource requirements define the
     * required resources for the specified job. The SlotManager will try to fulfill
     * these requirements.
     *
     * @param resourceRequirements resourceRequirements defines the resource requirements for a job
	 */
	void processResourceRequirements(ResourceRequirements resourceRequirements);
}


In order to enable the SlotManager to notify the JobMaster about not enough resources, we need to extend the JobMasterGateway with an additional method:

Code Block
languagejava
titleJobMasterGateway interface extension
interface JobMasterGateway {
  /**
   * Notifies that not enough resources are available to fulfill the resource requirements of a job.
   *
   * @param acquiredResources the resources that have been acquired for the job
   */
   void notifyNotEnoughResourcesAvailable(Collection<ResourceRequirement> acquiredResources);
}


Accepting resources

On the JobMaster side, the SlotPool is responsible for accepting offered slots, and matching these against the requirements of the job. It has to follow the same logic for matching slots as the SlotManager.

...

If the SlotPool is provided with more slots than are currently required, then it will reject return these slots after the idle slot timeout has passed. This serves as a sort of grace period, potentially allowing us to make use of excessive slots later on without having to do another round-trip to the ResourceManager.


Note: Depending on the scheduling requirements it might make sense to reuse slots which have been freed on the JobMaster because it reduces latency or to return them and to ask for properly sized slots because it improves resource utilization (assuming different resource requirements). At the moment, we assume that reusing slots is possible. In the future we might have to make this behaviour configurable.

Releasing resources

Resources/Slots are released by the JobManager JobMaster by calling TaskExecutorGateway.freeSlot() and by updating the required resources by calling ResourceManagerGateway.declareRequiredResources with the updated resource requirements.

...

In the first version, the SlotPool will aggregate individual slot requests that are issued by the Scheduler into a ResourceRequirements  and announce them to the ResourceManager. Once a matching slot is returned the corresponding request future can be completed.

The new SlotManager will internally compute slot requests based on the difference between declared resource requirements, and then go similar code paths like the current version.

Lazy ExecutionGraph construction

...

The slotmanager.request-timeout option will now no longer have an effect.

Follow ups

Removing the AllocationID

Once the old SlotPool implementations are removed, it might be possible to remove the AllocationID and to identify slots via the SlotID.