Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • How to respect condition2:
    • Start assigning slots to tm after the number of available slots exceeds or equals the number of slot requests.
  • How to respect condition1:
    • The root cause of condition1 cannot be met is that the slot requests are sent from JobMaster to SlotPool is one by one instead of one whole batch.
    • The condition1 is meet in the most of scenarios, because the request for slots is faster than registering slots to the slot pool.(Registering slots to the slot pool needs to request and start TM.)
    • But there are two special cases: 
      • This is not true if a job is re-scheduled after a task failure. (Slots are ready in advance)
      • It's also not true for batch jobs. (Single task can run)
    • Solution1: Introducing an additional waiting mechanism: `slot.request.max-interval`
      • It means the maximum time interval when JM requests multiple slots from slotPool.
      • We think all slot requests are sent from JobMaster to SlotPool when `current time > arrival time of the last slotRequest + slot.request.max-interval`.
      • Of course, the `slot.request.max-interval` can be very small, like 30ms20ms? not sure. Because the call stack is java method call, it isn't across process and it isn't rpc.
    • Solution2: Changing or adding the interface that from Scheduler to SlotPool.
      • 2.1 Changing the interface from the single slot request to batch slot request. It doesn't work well with PipelinedRegionSchedulingStrategy.
      • 2.2 Adding a interface let slot pool know all requests are arrived. (If so, the slot pool doesn't need any timeout or waiting mechanism).
    • We choose the solution1 because the solution2 may complicate the scheduling process a lot. (From this discussion).
  • Based on these information, when :
    • When `current time > arrival time of the last slotRequest + slot.request.max-interval`
    and
    • , then the SlotPool declareResourceRequirement to the RM.
    • When `the number of available slots cached exceeds or equals the pending slots`, the slots selection and allocation can be carried out.

2.2.3 Allocation strategy for slot to TM

When the available slots applied for can support job deploying, the waiting could be ended. After that, sort the ExecutionSlotSharingGroups(slotRequests) descending based on the slot load. And round-robin allocation for the slot whose taskmanager contains the most free slot number and the smallest tasks loading.

Example1:

  • Assume there are 6 ESSGs.

Limitation: The waiting mechanism does not take effect for Batch jobs, so Batch jobs do not have a global perspective to balance the number of tasks when allocating slots to TMs. There are several main reasons:

  1. Batch jobs can run normally even with only one TM
  2. Batch jobs are run in stages, and there may be only a small number of tasks in a slot at the same time.
  3. Batch jobs can use the cluster.evenly-spread-out-slots=true capability to spread slots across as many TMs as possible, especially in OLAP scenarios.
    • The option is replaced with taskmanager.load-balance.mode=Slots, For details, see Section 3.1.

2.2.3 Allocation strategy for slot to TM

When the available slots applied for can support job deploying, the waiting could be ended. After that, sort the ExecutionSlotSharingGroups(slotRequests) descending based on the slot load. And round-robin allocation for the slot whose taskmanager contains the most free slot number and the smallest tasks loading.


Example1:

  • Assume there are 6 ESSGs.

ESSG0: the number of the tasks ESSG0: the number of the tasks is  4

ESSG1: the number of the tasks is  4

...

  • Preferred conclusion:

We prefer to reuse and enhance `cluster.evenly-spread-out-slots`. The the option2, the rest of the document is designed by reusing the option in the future.

...

Introduce the following configuration items:

    • slottaskmanager.sharing-strategyload-balance.mode
      • This option controls 2 phases during scheduling
        • Task → Slot  has 2 strategies: LOCAL_INPUT_PREFERRED and
      • Value:
        • TASK_BALANCED_PREFERRED: It represents the new tasks’ balanced scheduling.
          • Introduce an Enum named TaskSlotSharingStrategy.
        • Slot → TM has 2 strategies: cluster.evenly-spread-out-slots is true or false
      • Value:
        • None: LOCAL_INPUT_PREFERRED :It represents the tasks’ local input preferred scheduling
      • type: TaskSlotSharingStrategy
        • + cluster.evenly-spread-out-slots=false (It's the default behavior of old flink version)
        • Slots:
        default:
        • LOCAL_INPUT_PREFERRED
        • +  cluster.evenly-spread-out-slots=true (It is used to be compatible with the old cluster.evenly-spread-out-slots option)
        • Tasks: TASK_BALANCED_PREFERRED + cluster.evenly-spread-out-slots=true (It's the new tasks’ balanced scheduling)
      • type: TaskManagerLoadBalanceMode
      • default: None
    • slot.request.max-interval
      • type:
      slot.request.max-interval
      • type: Duration
      • default: 50ms20ms
      • description: The max maximum interval duration for between JobMaster requesting a slot slots from the SlotPool.

3.2 Non-Public Change

    • Deprecate cluster.evenly-spread-out-slots
      • It can be replaced with the new taskmanager.load-balance.mode option.

3.2 Non-Public Change

3.3.2.1 Add the loading abstraction to various levels of slots 

...

      • Introduce the logic to process the parameter `slot.request.max-interval`, which means the maximum time interval when JM requests multiple slots from SlotPool.
      • When `current time > arrival time of the last slotRequest + slot.request.max-interval`, then the SlotPool declareResourceRequirement to the RM.
      • When ` and the number of available slots cached exceeds or equals the pending slotsslots`, the slots selection and allocation can be carried out.
  • Enhance the old logic related to cluster.evenly-spread-out-slots
    • Change findMatchingSlot method signature for interfaceSlotMatchingStrategy as followed:

      • Code Block
        languagejava
        titleSlotMatchingStrategy
        linenumberstrue
        <T extends TaskManagerSlotInformation> Optional<T> findMatchingSlot(
                    ResourceProfile requestedProfile,
                    Collection<T> freeSlots,
                    Function<InstanceID, Integer> numberRegisteredSlotsLookup,
                    // the function to get tasks loading for TM.
                    Function<InstanceID, Long> slotTasksLoadingLookup);


...

  • We will focus on performance testing in three deployment modes:
    • Focus on two dimensions:
      • Does the number of tasks within each Slot within the Job scope meet expectations?
      • Does the number of tasks in each TM within the TM range meet expectations?
    • 1 Standalone session
      • Single job
      • Multiple jobs
    • 2 Application mode
    • 3 Session mode
      • Single job
      • Multiple jobs

Rejected AlternativesAlternatives

  • Expose two options to the user
  • Change the strategy of slot.request.max-interval
    • The old strategy is: when `current time > arrival time of the last slotRequest + slot.request.max-interval` and `the number of available slots cached exceeds or equals the pending slots`, the slots selection and allocation can be carried out.
    • The latest strategy is:
      • When `current time > arrival time of the last slotRequest + slot.request.max-interval`, then the SlotPool declareResourceRequirement to the RM.
      • When `the number of available slots cached exceeds or equals the pending slots`, the slots selection and allocation can be carried out.
    • This new strategy not only implements the functionality of the old strategy, but also reduces the number of rpcs from SlotPool to ResourceManager
    If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way
    • .


References

[1] https://docs.google.com/document/d/14WhrSNGBdcsRl3IK7CZO-RaZ5KXU2X1dWqxPEFr3iS8