Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

Discussion thread:_

JIRA: _

...

Page properties



Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Instead of asking for every slot individually by calling ResourceManagerGateway.requestSlot(JobMasterId, SlotRequest), the JobMaster should declare the collection of required slots via a new RPC ResourceManagerGateway.declareRequiredResources(JobMasterId jobMasterId, ResourceRequirements resourceRequirements).


Code Block
titleResourceRequirements
public class ResourceRequirements implements Serializable {

...


    private final JobID jobId;

...



    private final String targetAddress;

...



    private final Collection<ResourceRequirement> resourceRequirements;

...


}


Code Block
titleResourceRequirement
public class ResourceRequirement implements Serializable {

...


    private final ResourceProfile resourceProfile;

...


    private final int numberOfRequiredSlots;

...


}


The jobMasterId is used to filter out messages coming from an old Job leader.

...

Currently, the SlotManager supports failing unfulfillable slot requests by calling ResourceActions.notifyAllocationFailure. A slot is unfulfillable if the SlotManager has neither allocated slots nor can allocate a slot from the ResourceManager. This works because we have individual slot requests which are identified by the AllocationID. With the declarative resource management, we cannot fail individual slot requests. However, we can let the JobMaster know if we cannot fulfill the resource requirement for a job after resourcemanager.standalone.start-up-time has passed. In order to send this notification we have to introduce a new rpc RPC JobMaster.notifyNotEnoughResources(AvailableResources availableResourcesCollection<ResourceRequirement> acquiredResources). AvailableResources contains acquiredResources is the set collection of available acquired resources at for the ResourceManagerjob.

This signal is sent whenever the SlotManager tried to fulfill the requirements for a job but failed to do so.

...

The SlotManager will no longer timeout slot requests. Instead it will try to acquire enough resources to fulfill the required resources until the resource requirements have been changed. Failing the job execution if the job cannot acquire enough resources after a defined period of time is then the responsibility of the JobMaster and the SlotPool.

Interface changes

In order to enable the SlotManager to process resource declarations, we need to extend the interface with an additional method:


Code Block
titleSlotManager interface extension
interface SlotManager {
	/**
	 * Process the given resource requirements. The resource requirements define the
     * required resources for the specified job. The SlotManager will try to fulfill
     * these requirements.
     *
     * @param resourceRequirements resourceRequirements defines the resource requirements for a job
	 */
	void processResourceRequirements(ResourceRequirements resourceRequirements);
}


In order to enable the SlotManager to notify the JobMaster about not enough resources, we need to extend the JobMasterGateway with an additional method:

Code Block
languagejava
titleJobMasterGateway interface extension
interface JobMasterGateway {
  /**
   * Notifies that not enough resources are available to fulfill the resource requirements of a job.
   *
   * @param acquiredResources the resources that have been acquired for the job
   */
   void notifyNotEnoughResourcesAvailable(Collection<ResourceRequirement> acquiredResources);
}


Accepting resources

On the JobMaster side, the SlotPool is responsible for accepting offered slots, and matching these against the requirements of the job. It has to follow the same logic for matching slots as the SlotManager.

...

If the SlotPool is provided with more slots than are currently required, then it will reject return these slots after the idle slot timeout has passed. This serves as a sort of grace period, potentially allowing us to make use of excessive slots later on without having to do another round-trip to the ResourceManager.


Note: Depending on the scheduling requirements it might make sense to reuse slots which have been freed on the JobMaster because it reduces latency or to return them and to ask for properly sized slots because it improves resource utilization (assuming different resource requirements). At the moment, we assume that reusing slots is possible. In the future we might have to make this behaviour configurable.

Releasing resources

Resources/Slots are released by the JobMaster by calling TaskExecutorGateway.freeSlot() and by updating the required resources by calling ResourceManagerGateway.declareRequiredResources with the updated resource requirements.

...

The slotmanager.request-timeout option will no longer have an effect.

Follow ups

Removing the AllocationID

Once the old SlotPool implementations are removed, it might be possible to remove the AllocationID and to identify slots via the SlotID.