Status

Discussion thread
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release1.12


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

There are currently multiple shortcomings in Flink’s scheduler. In this FLIP, we want to focus on resolving potential batch job deadlocks and unifying the different code paths for batch and streaming jobs. The following sections will give a brief overview of the shortcomings addressed by this FLIP.

Resource Deadlocks

There are different cases when Flink’s batch scheduling can deadlock due to limited cluster resources. Deadlocks can be completely avoided by exclusively employing blocking data exchanges. However, by doing so one will likely slow down the execution of the job and also higher disk I/O is incurred.

Deadlocks caused by competing Slot Allocations

We will discuss this case by example using the task topology shown below. Assume exactly two slots to be available to the cluster. Furthermore, all data exchanges are assumed to be pipelined, and all tasks are in the same slot sharing group.

All tasks are scheduled at the same time and compete for the available slots. If, for example, A2 and A4 are assigned slots, all slots of the cluster will be occupied indefinitely. Note that A2 and A4 will not be able to finish without B1 and B2 running. If we had scheduled (A1, B1) and (A2) to the two available slots, the upper region could have actually finished. Note that (A1, B1) only occupies one slot due to slot sharing.

Deadlocks caused by Consumers being scheduled before Producers 

In the example topology A1 and C1 are connected with a blocking data exchange. B1 and C1 are communicating with a pipelined data exchange. Assume A1 to be the only task outside of the default slot sharing group.

If only one slot is available to the cluster, the scheduling/execution of this job is prone to deadlocks. Concretely, if (B1, C1) is scheduled first, the tasks will never be able to finish.

Waiting indefinitely when scheduling with fewer Resources than the specified Parallelism

Batch jobs using only blocking data exchanges could run using a single slot regardless of the parallelism. However, Flink does not offer proper support for this. This is because we issue slot requests for all tasks with satisfied input dependencies at the same time. If there are not enough resources to run all tasks at the same time, some of the slot requests may timeout and thus fail the job. 

For the Blink planner (imperfect) heuristics are implemented to avoid slot requests timeouts as long as the job has a chance to finish (see SlotPool#requestNewAllocatedBatchSlot()). The heuristics assumes that if at least one slot is present, the job will be able to finish. However, if the job requires more than one slot at the same time, the heuristics fails.

Non-unified Scheduling

The Pipelined Region Scheduler can also be considered a proposal to unify scheduling of batch and streaming jobs. The scheduler makes a distinction between these two types of jobs by having two implementations of the SchedulingStrategy interface. An obvious downside of having multiple scheduling strategies is added code complexity. Furthermore, the scheduling strategies cannot be arbitrarily interchanged with each other. For example, the batch scheduling strategy cannot be used with streaming jobs.

Public Interfaces

The existing config option table.exec.shuffle-mode will be extended with new values and the old values will still be respected. More details can be found in section Proposed Changes → Global Data Exchange Mode .

Proposed Changes

Pipelined Region Scheduling

In this section we outline how batch jobs are scheduled via Pipelined Region Scheduling. Then we show that the scheduler will also be able to schedule streaming jobs using the same principles. 

Tasks in a batch job are communicating with each other using pipelined and blocking data exchanges. A pipelined region is defined as the set of tasks that are connected via pipelined data exchanges. It follows that the incoming and outgoing edges of a pipelined region are blocking intermediate result partitions.

The basic rules of pipelined region scheduling are:

  1. Schedule pipelined connected tasks together, i.e., treat a region as a whole in scheduling.
  2. Schedule a region only if all the result partitions it consumes are ready. This ensures that a started region will be able to finish, and thus can release slots to be used by other regions.
  3. Slot allocation competitions between different pipelined regions must be avoided. This ensures that a scheduled region will always be able to acquire required slots to start work (given that the cluster has enough resources for each single region).

This way, the strategy guarantees that a job can run successfully as long as the cluster is able to fulfill the resource requirements of the largest region.

Note that the above algorithm works for streaming jobs because all tasks in a streaming job are connected with each other by pipelined data exchanges. If the streaming job employs a shuffle, all tasks land in the same pipelined region and the Pipelined Region Scheduler will trivially schedule all tasks at the same time. For streaming jobs that do not employ a shuffle, one may or may not have to apply special considerations (see Embarrassingly parallel Streaming Jobs).

Pipelined Region Scheduling Strategy

In FLINK-10429 we have reworked the scheduler. SchedulingStrategy is now responsible for determining when to schedule and which tasks to schedule. We can leverage this feature by implementing a PipelinedRegionSchedulingStrategy which respects rules #1 and #2 of pipelined region scheduling. 

The PipelinedRegionSchedulingStrategy submits one pipelined region to the DefaultScheduler each time. The DefaultScheduler will treat the bulk of tasks in one submission as a whole, namely allocate slots for them and deploy them only if all of them have slots assigned.

The PipelinedRegionSchedulingStrategy must be aware of the inputs of each pipelined region. It should schedule a region if and only if all the inputs of that region become consumable.

Scheduling of a region can be triggered by one of the following events:

  1. the job has just started
  2. a result partition becomes consumable
  3. a failover happens to recover from a failure

There are already corresponding callbacks in SchedulingStrategy that PipelinedRegionSchedulingStrategy can implement as below:

  1. startScheduling() : schedule all source regions one by one.
  2. onPartitionConsumable(partition) : Check all the consumer regions of the notified partition, if all the inputs of a region have turned to be consumable, schedule the region
  3. restartTasks(tasksToRestart) : find out all regions which contain the tasks to restart, reschedule those whose inputs are all consumable

FIFO Physical Slot Assignment

Slot allocation competition between pipelined regions must be avoided otherwise resource deadlocks may happen. However, we cannot schedule regions and allocate resources for them sequentially since it will significantly slow down the scheduling process. This is because the TM requesting and launching process can take quite a few seconds. So we should try to parallelize this process for different tasks, even if they are not from the same region. 

To achieve this goal, we propose a FIFO physical slot assignment mechanism to SlotPool. With it enabled, the SlotPool tries to fulfill the oldest pending slot request once it receives an available slot. The available slot can be returned by another terminated task or is just offered from a task manager. This naturally ensures that slot requests of an earlier scheduled region will be fulfilled earlier than requests of a later scheduled region. This is different from the current slot assignment mechanism that a newly offered slot will tend to fulfill the slot request which requested it from the ResourceManager.

Note that the fields pendingRequests and waitingForResourceManager store the pending requests in LinkedHashMaps . Therefore, tryFulfillSlotRequestOrMakeAvailable(...) will naturally fulfill the pending requests in inserted order. So we only need to change the slot assignment logic on slot offers. 

When a new slot is offered via SlotPoolImpl#offerSlot(...) , we should use it to fulfill the oldest fulfillable slot request via tryFulfillSlotRequestOrMakeAvailable(...) . And if a pending request (say R1) exists with the allocationId of the offered slot, and it is different from the request to fulfill (say R2), we should update the pendingRequest to replace AllocationID of R1 to be the AllocationID of R2. This ensures failAllocation(...) can fail slot allocation requests to trigger restarting tasks and re-allocating slots. 

Note that this works because we assume all the requested slots to be of the same size, which is true at the moment because all ResourceSpecs are UNKNOWN. If later we want to enable this feature for jobs with different slot sizes, extra efforts will be needed.

Bulk Slot Allocation

The SlotPool  needs to know all the slot allocations of a pipelined region since they must be fulfilled at the same time. Otherwise it cannot tell whether the cluster is able to offer enough slots for a pipelined region to run. 

We would like to introduce the concept of bulk slot allocation to tell SlotPool about slot allocations that must be fulfilled at the same time. It enables SlotPool to check whether a bulk of slot allocations is fulfillable. A slot allocation bulk is fulfillable if its resource requirements can be satisfied using all reusable slots in the SlotPool . Note that we do not require all resources in the SlotPool  to be available immediately . Slots occupied by bounded tasks are considered ‘reusable’ because they will eventually be released once the task completes. Unbounded tasks will occupy the slots indefinitely.

The SlotProvider interface should be extended with an bulk slot allocation method which accepts a bulk of slot requests as one of the parameters. See Extended SlotProvider Interface section below for more details.

The SlotProviderStrategy and DefaultExecutionSlotAllocator should be updated accordingly to invoke the bulk slot allocation methods.

Extended SlotProvider Interface

We propose the following change to the SlotProvider interface:

CompletableFuture<Collection<LogicalSlotRequestResult>> allocateSlots(
  Collection<LogicalSlotRequest> slotRequests,
  Time allocationTimeout);

class LogicalSlotRequest {
  SlotRequestId slotRequestId;
  ScheduledUnit scheduledUnit;
  SlotProfile slotProfile;
  boolean slotWillBeOccupiedIndefinitely;
}

class LogicalSlotRequestResult {
  SlotRequestId slotRequestId;
  LogicalSlot slot;
}

Its implementation ( SchedulerImpl ) should complete the result future only if all requests are fulfilled, or if any failure happens, like allocation timeout, slot loss, etc.

The  allocationTimeout controls how long we wait at most for all slot requests to become fulfillable. When  SchedulerImpl receives slot requests that are not fulfillable at the time of the request, we will wait for the  allocationTimeout to expire until requests are failed. The timeout will be canceled once the corresponding requests become fulfillable. However, when slots are lost and the pending requests become unfulfillable, the timeout will be started again.

The field slotWillBeOccupiedIndefinitely in LogicalSlotRequest  denotes whether a task is bounded or not. It will be used to tag the allocated slots and helps with the slot requests fulfillability check. For the first implementation we will simply assume all tasks in batch jobs to be bounded and all tasks in streaming jobs to be unbounded.

Make Pipelined Region a Common Component

Pipelined regions are only used by PipelinedRegionFailoverStrategy and RegionPartitionReleaseStrategy at the moment. The regions are built and stored separately if these strategies are used.

With this FLIP, PipelinedRegionSchedulingStrategy will also need pipelined regions to make decisions. To avoid duplicating the costs of building and storing pipelined regions, we propose to make PipelinedRegion a common component.

The Topology interface should be extended with ability to get regions and its implementations should take the role to build and store pipelined regions.

interface Topology {
  //...
  Iterable<PipelinedRegion> getAllPipelinedRegions();

  PipelinedRegion getPipelinedRegionOfVertex(VID vertexId);
  //...
}

interface PipelinedRegion {
  Iterable<V> getVertices();

  V getVertex(VID vertexId);

  Iterable<R> getConsumedResults();
}

Global Data Exchange Mode

To better utilize pipelined region scheduling, we propose to introduce a GlobalDataExchangeMode . It is a job-wide mode which helps to automatically set data exchange types for job edges. Therefore it controls how to divide jobs into pipelined regions. There are 4 modes to fit for different scenarios:

  • ALL_EDGES_BLOCKING : The most conservative setting. Should only be used with special consideration.
  • FORWARD_EDGES_PIPELINED : With this mode, each pipelined region would need one and only one slot to run. Can be used in resource limited scenarios or if it should be guaranteed that the job can successfully run with only 1 slot. 
  • POINTWISE_EDGES_PIPELINED : Pointwise distribution pattern includes FORWARD and RESCALE. With this mode, RESCALE edges can be pipelined, at the cost of larger regions that may need more slots at the same time. However, in most cases, the number of required slots is much smaller than the parallelism. 
  • ALL_EDGES_PIPELINED : This would require slots no less that the parallelism. It saves time on scheduling tasks and can be used for interactive queries (see FLINK-16543).


StreamGraph will be extended with a new field to host the GlobalDataExchangeMode. In the JobGraph generation stage, this mode will be used to determine the data exchange type of each job edge.

Note that these modes are for Blink planner batch jobs only. DataSet jobs should continue to use ExecutionConfig#ExecutionMode to keep compatibility. We will extend the blink planner config option  table.exec.shuffle-mode to select GlobalDataExchangeMode for a job:

  • all-blocking
  • forward-pipelined-only
  • pointwise-pipelined-only
  • all-pipelined

The previously supported values are still supported to be compatible:

  • ‘pipelined’ will be treated as ‘all-pipelined’
  • ‘batch’ will be treated as ‘all-blocking’

Blink planner needs to set GlobalDataExchangeMode to StreamGraph according to the config value.

Data Exchange Mode Examples

The data exchange modes as proposed above have different resource requirements. We will explain the different modes by example using the job below.

  • ALL_EDGES_BLOCKING
    • Pipelined regions: 12
      • {A1}, {A2}
      • {B1}, {B2}
      • {C1}, {C2}, {C3}, {C4}
      • {D1}, {D2}, {D3}, {D4}
    • Blocking logical edges: 3
    • Minimum slots required: 1
  • FORWARD_EDGES_PIPELINED
    • Pipelined regions: 10
      • {A1, B1}
      • {A1, B2}
      • {C1}, {C2}, {C3}, {C4}
      • {D1}, {D2}, {D3}, {D4}
    • Blocking logical edges: 2
    • Minimum slots required: 1
  • POINTWISE_EDGES_PIPELINED
    • {A1, B1, C1, C2}
    • {A2,B2, C3, C4}
    • {D1}, {D2}, {D3}, {D4}
    • Pipelined regions: 6
    • Blocking logical edge: 1
    • Minimum slots required: 2
  • ALL_EDGES_PIPELINED
    • Pipelined regions: 1
      • {A1, A2, B2, B3, C1, C2, C3, C4, D1, D2, D3, D4}
    • Blocking logical edges: 0
    • Minimum slots required: 4

Compatibility, Deprecation, and Migration Plan

Since the Pipelined Region Scheduler will be implemented as another SchedulingStrategy , we can easily roll the scheduler out in multiple stages and also keep the previous scheduling strategies available if users run into issues. We are proposing the following roll out order:

  1. Enable new scheduler for Blink planner
  2. Enable new scheduler for DataSet jobs
  3. Enable new scheduler for DataStream jobs

Each stage can be done independently from each other. 

Limitations & Downsides

Embarrassingly parallel Streaming Jobs

A streaming job that is embarrassingly parallel like the one below has multiple distinct regions that will be scheduled separately by the Pipelined Region Scheduler.

If not enough resources are available, this can lead to only parts of the jobs being in running state. We do not consider this as a real limitation as partially running jobs can already occur since Flink 1.9 when using region failover. Moreover, users should be able to detect partially running jobs by monitoring relevant metrics. 

Consumers are scheduled earlier

Because we will schedule a pipelined region as a unit, it can happen that consumer tasks are scheduled too early and hog cluster resources unnecessarily. This is the case if a producing task takes a long time to emit the first record due to being computationally expensive, e.g., aggregate operations and sorting.

Resource Deadlocks could still happen in certain Cases

Pipelined region scheduling does not solve all resource deadlock cases. Resource deadlocks can still happen in the following cases:

  1. There are slots of different sizes. This would not happen in 1.11, but can happen when in the future we allow users to configure ResourceSpecs for operators.
  2. There are multiple jobs in a Flink session cluster and their slot allocations compete with each other. 

These cases do not block this design since case #1 will not happen in 1.11 and case #2 is a legacy problem. Below are more details of these resource deadlock cases.

Resource deadlocks when allocated slots are too small to fulfill the oldest request

Below is an example to demonstrate the issue that no slot allocation bulk can be completely fulfilled even if the cluster has enough resources to fulfill each bulk.

Possible solutions:

  • Option 1: SlotPool releases unused slots to RM and waits for the pending requests in RM to be fulfilled. Slot requests related to the released slots should also be re-sent to RM.
  • Option 2: Force FIFO slot allocation in SlotManager. We can do this after the SlotManager is pluggable (FLINK-14106).

Resource deadlocks when slots of different sizes are improperly assigned to slot requests

Below is an example to demonstrate this issue. Note that the 3 requests are in the same bulk.

Possible solutions:

  • Option 1: SlotPool can detect this issue and trigger a re-assignment of these slots.
  • Option 2: Force strictly matching of slots and slot requests. It should work if the issue “Resource deadlocks when allocated slots are too small to fulfill the oldest request” is already solved.

Resource deadlocks when slot allocation competition happens between multiple jobs in a session cluster

With pipelined region scheduling, we can avoid slot allocation competition between regions of one job. However, the slot allocation competition can also happen between multiple jobs if they are in the same Flink session. 

To solve this, one possible solution is to enable bulk slot allocation for ResourceManager. The ResourceManager should try to fulfill the oldest bulk of requests first and then the next bulk. We may also need to introduce to ResourceManager a slot allocation bulk confirming interface which will be invoked by the JobMaster when it sees the bulk of requests are all fulfilled. This would help in the case that a TaskManger is lost after RM requested slots from it but before the slots are offered to the JM.

Test Plan

We will write additional unit and integration tests. Furthermore, the changes are already covered by E2E tests.

Rejected Alternatives

None