...
Instead of asking for every slot individually by calling ResourceManagerGateway.requestSlot(JobMasterId, SlotRequest)
, the JobMaster should declare the collection of required slots via a new RPC ResourceManagerGateway.declareRequiredResources(JobMasterId jobMasterId, ResourceRequirements resourceRequirements)
.
Code Block | ||
---|---|---|
| ||
public class ResourceRequirements implements Serializable { |
...
private final JobID jobId; |
...
private final String targetAddress; |
...
private final Collection<ResourceRequirement> resourceRequirements; |
...
} |
Code Block | ||
---|---|---|
| ||
public class ResourceRequirement implements Serializable { |
...
private final ResourceProfile resourceProfile; |
...
private final int numberOfRequiredSlots; |
...
} |
The jobMasterId
is used to filter out messages coming from an old Job leader.
...
The SlotManager will no longer timeout slot requests. Instead it will try to acquire enough resources to fulfill the required resources until the resource requirements have been changed. Failing the job execution if the job cannot acquire enough resources after a defined period of time is then the responsibility of the JobMaster and the SlotPool.
Interface changes
In order to enable the SlotManager
to process resource declarations, we need to extend the interface with an additional method:
Code Block | ||
---|---|---|
| ||
interface SlotManager { /** * Process the given resource requirements. The resource requirements define the * required resources for the specified job. The SlotManager will try to fulfill * these requirements. * * @param resourceRequirements resourceRequirements defines the resource requirements for a job */ void processResourceRequirements(ResourceRequirements resourceRequirements); } |
Accepting resources
On the JobMaster side, the SlotPool is responsible for accepting offered slots, and matching these against the requirements of the job. It has to follow the same logic for matching slots as the SlotManager.
...