...
Page properties | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
...
- If the JV of the SEV doesn’t contain co-location constraint
- If the parallelism of JV is equals to numberOfSlots
- Traverse all SEVs of JV, assign the SEVs[subtask_index] to the ESSGs[subtask_index]This strategy ensures that SEVs with the same index can be assigned to the same ESSG.
- It also ensures that co-located subtasks will be in one ESSG, given that co-located job vertices share the same parallelism and will always be in the same SSG.
- In the case of forward edges, remote data exchange of such JVs can be reducedall subtasks with forward shuffle are still in the same Slot, and they are local data exchange.
- If the parallelism of JV is equals to numberOfSlots
- If the parallelism of JV is less than the numberOfSlots in the current SSG.
- Get ESSGs[++eSSGIndex] as ESSG.
- Add the SEV into the target ESSG.
- If the parallelism of JV is less than the numberOfSlots in the current SSG.
...
- How to respect condition2:
- Start assigning slots to tm after the number of available slots exceeds or equals the number of slot requests.
- How to respect condition1:
- The root cause of condition1 cannot be met is that the slot requests are sent from JobMaster to SlotPool is one by one instead of one whole batch.
- The condition1 is meet in the most of scenarios, because the request for slots is faster than registering slots to the slot pool.(Registering slots to the slot pool needs to request and start TM.)
- But there are two special cases:
- This is not true if a job is re-scheduled after a task failure. (Slots are ready in advance)
- It's also not true for batch jobs. (Single task can run)
- This is not true if a job is re-scheduled after a task failure. (Slots are ready in advance)
- Solution1: Introducing an additional waiting mechanism: `slot.request.max-interval`
- It means the maximum time interval when JM requests multiple slots from slotPool.
- We think all slot requests are sent from JobMaster to SlotPool when `current time > arrival time of the last slotRequest + slot.request.max-interval`.
- Of course, the `slot.request.max-interval` can be very small, like 30ms20ms? not sure. Because the call stack is java method call, it isn't across process and it isn't rpc.
- Solution2: Changing or adding the interface that from Scheduler to SlotPool.
- 2.1 Changing the interface from the single slot request to batch slot request. It doesn't work well with PipelinedRegionSchedulingStrategy.
- 2.2 Adding a interface let slot pool know all requests are arrived. (If so, the slot pool doesn't need any timeout or waiting mechanism).
- We choose the solution1 because the solution2 may complicate the scheduling process a lot. (From this discussion).
- Based on these information, when :
- When `current time > arrival time of the last slotRequest + slot.request.max-interval`, then the SlotPool declareResourceRequirement to the RM. (For all jobs, includes batch and streaming)
- When `the number of available slots cached exceeds or equals the pending slots`, the slots selection and allocation can be carried out.
2.2.3 Allocation strategy for slot to TM
When the available slots applied for can support job deploying, the waiting could be ended. After that, sort the ExecutionSlotSharingGroups(slotRequests) descending based on the slot load. And round-robin allocation for the slot whose taskmanager contains the most free slot number and the smallest tasks loading.
Example1:
- Assume there are 6 ESSGs.
ESSG0: the number of the tasks is 4
ESSG1: the number of the tasks is 4
ESSG2: the number of the tasks is 3
ESSG3: the number of the tasks is 3
ESSG4: the number of the tasks is 3
ESSG5: the number of the tasks is 3
When the taskmanager.numberOfTaskSlots is 3, the assigned result:
TM1 | TM1 task number(10) | TM2 | TM2 task number(10) |
ESSG0 | 4 | ESSG1 | 4 |
ESSG2 | 3 | ESSG3 | 3 |
ESSG4 | 3 | ESSG5 | 3 |
- (Just for streaming job)
- (Just for streaming job)
Limitation: The waiting mechanism(`the number of available slots cached exceeds or equals the pending slots`) does not take effect for Batch jobs, so Batch jobs do not have a global perspective to balance the number of tasks when allocating slots to TMs. There are several main reasons:
- Batch jobs can run normally even with only one TM
- Batch jobs are run in stages, and there may be only a small number of tasks in a slot at the same time.
- Batch jobs can use the cluster.evenly-spread-out-slots=true capability to spread slots across as many TMs as possible, especially in OLAP scenarios.
- The option is replaced with taskmanager.load-balance.mode=Slots, For details, see Section 3.1.
2.2.3 Allocation strategy for slot to TM
When the available slots applied for can support job deploying, the waiting could be ended. After that, sort the ExecutionSlotSharingGroups(slotRequests) descending based on the slot load. And round-robin allocation for the slot whose taskmanager contains the most free slot number and the smallest tasks loading.
Example1:
- Assume there are 6 ESSGs.
ESSG0: the number of the tasks is 4
ESSG1: the number of the tasks is 4
ESSG2: the number of the tasks is 3
ESSG3: the number of the tasks is 3
ESSG4: the number of the tasks is 3
ESSG5: the number of the tasks is 3
When the taskmanager.numberOfTaskSlots is 3, the assigned result:
TM1 | TM1 task number(10) | TM2 | TM2 task number(10) |
ESSG0 | 4 | ESSG1 | 4 |
ESSG2 | 3 | ESSG3 | 3 |
ESSG4 | 3 | ESSG5 | 3 |
When the taskmanager.numberOfTaskSlots is 2, the assigned result:
...
- Preferred conclusion:
We prefer to reuse and enhance `cluster.evenly-spread-out-slots`. The the option2, the rest of the document is designed by reusing the option in the future.
...
Introduce the following configuration items:
- slottaskmanager.sharing-strategyload-balance.mode
- This option controls 2 phases during scheduling
- Task → Slot has 2 strategies: LOCAL_INPUT_PREFERRED and
- Value:
- TASK_BALANCED_PREFERRED: It represents the new tasks’ balanced scheduling.
- Introduce an Enum named TaskSlotSharingStrategy.
- Slot → TM has 2 strategies: cluster.evenly-spread-out-slots is true or false
- TASK_BALANCED_PREFERRED: It represents the new tasks’ balanced scheduling.
- Value:
- None: LOCAL_INPUT_PREFERRED :It represents the tasks’ local input preferred scheduling
- type: TaskSlotSharingStrategy
- + cluster.evenly-spread-out-slots=false (It's the default behavior of old flink version)
- Slots:
- LOCAL_INPUT_PREFERRED
- + cluster.evenly-spread-out-slots=true (It is used to be compatible with the old cluster.evenly-spread-out-slots option)
- Tasks: TASK_BALANCED_PREFERRED + cluster.evenly-spread-out-slots=true (It's the new tasks’ balanced scheduling)
- type: TaskManagerLoadBalanceMode
- default: None
- This option controls 2 phases during scheduling
- slot.request.max-interval
- type:
- type: Duration
- default: 50ms20ms
- description: The max maximum interval duration for between JobMaster requesting a slot slots from the SlotPool.
- slottaskmanager.sharing-strategyload-balance.mode
3.2 Non-Public Change
- Deprecate cluster.evenly-spread-out-slots
- It can be replaced with the new taskmanager.load-balance.mode option.
- Deprecate cluster.evenly-spread-out-slots
3.2 Non-Public Change
3.3.2.1 Add the loading abstraction to various levels of slots
...
- Introduce the logic to process the parameter `slot.request.max-interval`, which means the maximum time interval when JM requests multiple slots from SlotPool.
- When `current time > arrival time of the last slotRequest + slot.request.max-interval`, then the SlotPool declareResourceRequirement to the RM.
- When ` and the number of available slots cached exceeds or equals the pending slotsslots`, the slots selection and allocation can be carried out.
- Enhance the old logic related to cluster.evenly-spread-out-slots
- Change findMatchingSlot method signature for interfaceSlotMatchingStrategy as followed:
Code Block language java title SlotMatchingStrategy linenumbers true <T extends TaskManagerSlotInformation> Optional<T> findMatchingSlot( ResourceProfile requestedProfile, Collection<T> freeSlots, Function<InstanceID, Integer> numberRegisteredSlotsLookup, // the function to get tasks loading for TM. Function<InstanceID, Long> slotTasksLoadingLookup);
- Change findMatchingSlot method signature for interfaceSlotMatchingStrategy as followed:
...
- We will focus on performance testing in three deployment modes:
- Focus on two dimensions:
- Does the number of tasks within each Slot within the Job scope meet expectations?
- Does the number of tasks in each TM within the TM range meet expectations?
- 1 Standalone session
- Single job
- Multiple jobs
- 2 Application mode
- 3 Session mode
- Single job
- Multiple jobs
- Focus on two dimensions:
Rejected Alternatives
- Expose two options to the user
- slot.sharing-strategy and cluster.evenly-spread-out-slots
- After discussing at mail list, we unify these 2 option into a option: taskmanager.load-balance.mode.
- Related discussions at here:
- Change the strategy of slot.request.max-interval
- The old strategy is: when `current time > arrival time of the last slotRequest + slot.request.max-interval` and `the number of available slots cached exceeds or equals the pending slots`, the slots selection and allocation can be carried out.
- The latest strategy is:
- For DeclarativeSlotPool#increaseResourceRequirementsBy, When `current time > arrival time of the last slotRequest + slot.request.max-interval`, then the DeclarativeSlotPool declareResourceRequirement to the RM.
- For DeclarativeSlotPool#decreaseResourceRequirementsBy, When `current time > arrival time of the last slotRequest + slot.request.max-interval`, then the DeclarativeSlotPool declareResourceRequirement to the RM.
- For DeclarativeSlotPoolBridge, When `the number of available slots cached exceeds or equals the pending slots`, the slots selection and allocation can be carried out.
- This new strategy not only implements the functionality of the old strategy, but also reduces the number of rpcs from SlotPool to ResourceManager
- .
References
[1] https://docs.google.com/document/d/14WhrSNGBdcsRl3IK7CZO-RaZ5KXU2X1dWqxPEFr3iS8