Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note that this works because we assume all the requested slots to be of the same size, which is true at the moment because all ResourceSpecs are UNKNOWN. If later we want to enable this feature for jobs with different slot sizes, extra efforts will be needed.

Bulk Slot Allocation

The SlotPool needs to know all the slot allocations of a pipelined region since they must be fulfilled at the same time. Otherwise it cannot tell whether the cluster is able to offer enough slots for a pipelined region to run. 

We would like to introduce the concept of bulk slot allocation to tell SlotPool about slot allocations that must be fulfilled at the same time. It enables SlotPool to check whether a bulk of slot allocations is fulfillable. A slot allocation bulk is fulfillable if its resource requirements can be satisfied using all resources all reusable slots in the SlotPool. Note that we do not require all resources in the SlotPool to be available immediately, i.e., some of the resources can be already . Slots occupied by other bounded tasks but are still considered ‘reusable’ because they may be freed in the futurewill eventually be released once the task completes. Unbounded tasks will occupy the slots indefinitely.

The SlotProvider interface should be extended with an bulk slot allocation method which accepts a bulk of slot requests as one of the parameters. See Extended SlotProvider Interface section below for more details.

...

Code Block
languagejava
CompletableFuture<Collection<LogicalSlotRequestResult>> allocateSlots(
  Collection<LogicalSlotRequest> slotRequests,
  Time allocationTimeout,
  Time unfulfillableResourceTimeout);

class LogicalSlotRequest {
  SlotRequestId slotRequestId;
  ScheduledUnit scheduledUnit;
  SlotProfile slotProfile;
  boolean slotWillBeOccupiedIndefinitely;
}

class LogicalSlotRequestResult {
  SlotRequestId slotRequestId;
  LogicalSlot slot;
}

Its implementation ( SchedulerImpl ) should complete the result future only if all requests are fulfilled, or if any failure happens, like allocation timeout, slot loss, etc.

The allocationTimeout controls how long we wait at most for all slot requests to complete. Note that we introduced unfulfillableResourceTimeout as an additional parameter which

The allocationTimeout controls how long we wait until we deem a slot allocation bulk to be unfulfillable. Using a background task, we continuously monitor the state of the SlotPool ; if its state changes (e.g., TaskManager loss, new TaskManagers started), we will reset the unfulfillableResourceTimeout. If the unfulfillableResourceTimeout expires, the result future will be failed.For streaming jobs, the allocationTimeout and the unfulfillableResourceTimeout are set to the same value, e.g., 5 minutes. For batch jobs, the allocationTimeout will be set to infinity, while unfulfillableResourceTimeout should be set to a reasonably low value, e.g., 5 minutes. The finite timeout values can be configured via slot.request.timeout at most for all slot requests to become fulfillable. When SchedulerImpl receives slot requests that are not fulfillable at the time of the request, we will wait for the allocationTimeout to expire until requests are failed. The timeout will be canceled once the corresponding requests become fulfillable. However, when slots are lost and the pending requests become unfulfillable, the timeout will be started again.

The field slotWillBeOccupiedIndefinitely in LogicalSlotRequest denotes whether a task is bounded or not. It will be used to tag the allocated slots and helps with the slot requests fulfillability check. For the first implementation we will simply assume all tasks in batch jobs to be bounded and all tasks in streaming jobs to be unbounded.

Make Pipelined Region a Common Component

...