Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note that the above algorithm works for streaming jobs because all tasks in a streaming job are connected with each other by pipelined data exchanges. If the streaming job employs a shuffle, all tasks land in the same pipelined region and the Pipelined Region Scheduler will trivially schedule all tasks at the same time. For streaming jobs that do not employ a shuffle, one may or may not have to apply special considerations (see Embarrassingly parallel Streaming Jobs ).

Pipelined Region Scheduling Strategy

In FLINK-10429 we have reworked the scheduler. SchedulingStrategy is now responsible for determining when to schedule and which tasks to schedule. We can leverage this feature by implementing a PipelinedRegionSchedulingStrategy which respects rules #1 and #2 of pipelined region scheduling. 

...

  1. startScheduling() : schedule all source regions one by one.
  2. onPartitionConsumable(partition) : Check all the consumer regions of the notified partition, if all the inputs of a region have turned to be consumable, schedule the region
  3. restartTasks(tasksToRestart) : find out all regions which contain the tasks to restart, reschedule those whose inputs are all consumable

FIFO Physical Slot Assignment

Slot allocation competition between pipelined regions must be avoided to avoid resource deadlocks. However, we cannot schedule regions and allocate resources for them sequentially since it will significantly slow down the scheduling process. This is because the TM requesting and launching process can take quite a few seconds and we should try to parallelize this process for different tasks, even if they are not from the same region. 

...

Note that this works because we assume all the requested slots to be of the same size, which is true at the moment because all ResourceSpecs are UNKNOWN. If later we want to enable this feature for jobs with different slot sizes, extra efforts will be needed.

Bulk Slot Allocation

The SlotPool  needs to know all the slot allocations of a pipelined region since they must be fulfilled at the same time. Otherwise it cannot tell whether the cluster is able to offer enough slots for a pipelined region to run. 

...

The SlotProviderStrategy and DefaultExecutionSlotAllocator should be updated accordingly to invoke the bulk slot allocation methods.

Extended SlotProvider Interface

We propose the following change to the SlotProvider interface:

...