Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For the Blink planner (imperfect) heuristics are implemented to avoid slot requests timeouts as long as the job has a chance to finish (see SlotPool#requestNewAllocatedBatchSlot()). The heuristics assumes that if at least one slot is present, the job will be able to finish. However, if the job requires more than one slot at the same time, the heuristics fails.

...

The Pipelined Region Scheduler can also be considered a proposal to unify scheduling of batch and streaming jobs. The scheduler makes a distinction between these two types of jobs by having two implementations of the SchedulingStrategy interface. An obvious downside of having multiple scheduling strategies is added code complexity. Furthermore, the scheduling strategies cannot be arbitrarily interchanged with each other. For example, the batch scheduling strategy cannot be used with streaming jobs.

...

The existing config option table.exec.shuffle-mode will be extended with new values and the old values will still be respected. More details can be found in section Proposed Changes → Global Data Exchange Mode .

...

In FLINK-10429 we have reworked the scheduler. SchedulingStrategy is now responsible for determining when to schedule and which tasks to schedule. We can leverage this feature by implementing a PipelinedRegionSchedulingStrategy which respects rules #1 and #2 of pipelined region scheduling. 

The PipelinedRegionSchedulingStrategy submits one pipelined region to the DefaultScheduler each time. The DefaultScheduler will treat the bulk of tasks in one submission as a whole, namely allocate slots for them and deploy them only if all of them have slots assigned.

The PipelinedRegionSchedulingStrategy must be aware of the inputs of each pipelined region. It should schedule a region if and only if all the inputs of that region become consumable.

...

There are already corresponding callbacks in SchedulingStrategy that PipelinedRegionSchedulingStrategy can implement as below:

  1. startScheduling(): schedule all source regions one by one.
  2. onPartitionConsumable(partition): Check all the consumer regions of the notified partition, if all the inputs of a region have turned to be consumable, schedule the region
  3. restartTasks(tasksToRestart): find out all regions which contain the tasks to restart, reschedule those whose inputs are all consumable

...

To achieve this goal, we propose a FIFO physical slot assignment mechanism to SlotPool. With it enabled, the SlotPool tries to fulfill the oldest pending slot request once it receives an available slot, no matter if the slot is returned by another terminated task or is just offered from a task manager. Currently, a newly offered slot will tend to fulfill the slot request which requested it from the ResourceManager. This naturally ensures that slot requests of an earlier scheduled region will be fulfilled earlier than requests of a later scheduled region.

Note that the fields pendingRequests and waitingForResourceManager store the pending requests in LinkedHashMaps. Therefore, tryFulfillSlotRequestOrMakeAvailable(...) will naturally fulfill the pending requests in inserted order. So we only need to change the slot assignment logic on slot offers. 

When a new slot is offered via SlotPoolImpl#offerSlot(...), we should use it to fulfill the oldest fulfillable slot request via tryFulfillSlotRequestOrMakeAvailable(...). And if a pending request (say R1) exists with the allocationId of the offered slot, and it is different from the request to fulfill (say R2), we should update the pendingRequest to replace AllocationID of R1 to be the AllocationID of R2. This ensures failAllocation(...) can fail slot allocation requests to trigger restarting tasks and re-allocating slots. 

...

We would like to introduce the concept of bulk slot allocation to tell SlotPool about slot allocations that must be fulfilled at the same time. It enables SlotPool to check whether a bulk of slot allocations is fulfillable. A slot allocation bulk is fulfillable if its resource requirements can be satisfied using all resources in the SlotPool . Note that we do not require all resources in the SlotPool to be available immediately, i.e., some of the resources can be already occupied by other tasks but are still considered because they may be freed in the future.

The SlotProvider interface should be extended with an bulk slot allocation method which accepts a bulk of slot requests as one of the parameters. See Extended SlotProvider Interface section below for more details.

The SlotProviderStrategy and DefaultExecutionSlotAllocator should be updated accordingly to invoke the bulk slot allocation methods.

...

Code Block
languagejava
CompletableFuture<Collection<LogicalSlotRequestResult>> allocateSlots(
  Collection<LogicalSlotRequest> slotRequests,
  Time allocationTimeout,
  Time unfulfillableResourceTimeout);

class LogicalSlotRequest {
  SlotRequestId slotRequestId;
  ScheduledUnit scheduledUnit;
  SlotProfile slotProfile;
}

class LogicalSlotRequestResult {
  SlotRequestId slotRequestId;
  LogicalSlot slot;
}

Its implementation (SchedulerImpl) should complete the result future only if all requests are fulfilled, or if any failure happens, like allocation timeout, slot loss, etc. The allocationTimeout controls how long we wait at most for all slot requests to complete. 

Note that we introduced unfulfillableResourceTimeout as an additional parameter which controls how long we wait until we deem a slot allocation bulk to be unfulfillable. Using a background task, we continuously monitor the state of the SlotPool ; if its state changes (e.g., TaskManager loss, new TaskManagers started), we will reset the unfulfillableResourceTimeout. If the unfulfillableResourceTimeout expires, the result future will be failed.

For streaming jobs, the allocationTimeout and the unfulfillableResourceTimeout are set to the same value, e.g., 5 minutes. For batch jobs, the allocationTimeout will be set to infinity, while unfulfillableResourceTimeout should be set to a reasonably low value, e.g., 5 minutes. The finite timeout values can be configured via slot.request.timeout .

...

Pipelined regions are only used by PipelinedRegionFailoverStrategy and RegionPartitionReleaseStrategy at the moment. The regions are built and stored separately if these strategies are used.

With this FLIP, PipelinedRegionSchedulingStrategy will also need pipelined regions to make decisions. To avoid duplicating the costs of building and storing pipelined regions, we propose to make PipelinedRegion a common component.

The Topology interface should be extended with ability to get regions and its implementations should take the role to build and store pipelined regions.

...

To better utilize pipelined region scheduling, we propose to introduce a GlobalDataExchangeMode. It is a job-wide mode which helps to automatically set data exchange types for job edges. Therefore it controls how to divide jobs into pipelined regions. There are 4 modes to fit for different scenarios:

  • ALL_EDGES_BLOCKING: The most conservative setting. Should only be used with special consideration.
  • FORWARD_EDGES_PIPELINED: With this mode, each pipelined region would need one and only one slot to run. Can be used in resource limited scenarios or if it should be guaranteed that the job can successfully run with only 1 slot. 
  • POINTWISE_EDGES_PIPELINED: Pointwise distribution pattern includes FORWARD and RESCALE. With this mode, RESCALE edges can be pipelined, at the cost of larger regions that may need more slots at the same time. However, in most cases, the number of required slots is much smaller than the parallelism. 
  • ALL_EDGES_PIPELINED: This would require slots no less that the parallelism. It saves time on scheduling tasks and can be used for interactive queries (see FLINK-16543).


StreamGraph will be extended with a new field to host the GlobalDataExchangeMode. In the JobGraph generation stage, this mode will be used to determine the data exchange type of each job edge.

Note that these modes are for Blink planner batch jobs only. DataSet jobs should continue to use ExecutionConfig#ExecutionMode to keep compatibility. We will extend the blink planner config option "option table.exec.shuffle-mode" to select GlobalDataExchangeMode for a job:

...

Since the Pipelined Region Scheduler will be implemented as another SchedulingStrategy, we can easily roll the scheduler out in multiple stages and also keep the previous scheduling strategies available if users run into issues. We are proposing the following roll out order:

...