Motivation
The current strategy of Flink to deploy tasks sometimes leads some TMs have more tasks while others have fewer tasks, resulting in excessive resource utilization at some TMs that contain more tasks and becoming a bottleneck for the entire job processing. Developing strategies to achieve task load balancing for TMs and reducing job bottlenecks becomes very meaningful.
Public Interfaces
Introduce the following configuration items:
- slot.sharing-strategy
- Value:
- TASK_BALANCED_PREFERRED: It represents the new tasks’ balanced scheduling.
- Introduce an Enum named TaskSlotSharingStrategy.
- LOCAL_INPUT_PREFERRED:It represents the tasks’ local input preferred scheduling
- TASK_BALANCED_PREFERRED: It represents the new tasks’ balanced scheduling.
- type: TaskSlotSharingStrategy
- default: LOCAL_INPUT_PREFERRED
- Value:
- slot.request.max-interval
- type: Duration
- default: 50ms
- description: The max interval duration for requesting a slot from SlotPool.
- slot.sharing-strategy
Proposed Changes
1 Add the loading abstraction to various levels of slots
- Introduce the abstraction and the interface
LoadingWeight
LoadingWeightpublic interface LoadingWeight { float getLoading(); LoadingWeight merge(LoadingWeight other); }
WeightLoadable
LoadingWeightpublic interface WeightLoadable { LoadingWeight getLoading(); void setLoading(@Nonnull LoadingWeight loadingWeight); }
- Classes need to implement or extend WeightLoadable
- ExecutionSlotSharingGroup
- PhysicalSlotRequest
- PendingRequest
- SlotInfo
- SlotContext
- PhysicalSlot
- AllocatedSlot
- TaskManagerSlotInformation
- DeclarativeTaskManagerSlot
- FineGrainedTaskManagerSlot
- Other classes need to add LoadingWeights
- ResourceRequirement
List<LoadingWeight> loadingWeights;
- ResourceCounter
Map<ResourceProfile, List<LoadingWeight>> loadingWeights;
2 Implement task-to-slot balancing strategy for Default Scheduler
- Introduce BalancedPreferredSlotSharingStrategy
- BalancedPreferredSlotSharingStrategy
- implements SlotSharingStrategy, SchedulingTopologyListener
- responsibility:Supply the Task-> Slot(ESSG) balanced mapping
- Other Classes
- BalancedPreferredSlotSharingStrategy.ExecutionSlotSharingGroupBuilder
- BalancedPreferredSlotSharingStrategy.Factory
- implements SlotSharingStrategy.Factory
- BalancedSlotSharingExecutionSlotAllocatorFactory
- implements ExecutionSlotAllocatorFactory
3 Implement slots to TMs balancing for DefaultScheduler:
- Waiting mechanism
- Adapt the implemented classes of RequestSlotMatchingStrategy
Use the original information and the LoadingWeight to match target PendingRequests.
- SimpleRequestSlotMatchingStrategy
- PreferredAllocationRequestSlotMatchingStrategy
- Introduce a new implementation of SlotPool
- Introduce a availableSlots Map in type HashMap<AllocationID, PhysicalSlot>
- When calling newSlotsAreAvailable, cache the slots into availableSlots
- if the size of the availableSlots is greater than or equals to the size of pendingRequests and the availableSlots could be be matched with all pendingRequests:
- Introduce a new implementation of SlotPool
Start match the all required slots to fulfill the pending slots request and update the availableSlots.
- If not: do nothing.
- Introduce the logic to process the parameter `slot.request.max-interval`, which means the maximum time interval when JM requests multiple slots from SlotPool. When `current time > arrival time of the last slotRequest + slot.request.max-interval` and the number of available slots cached exceeds or equals the pending slots, the slots selection and allocation can be carried out.
- Enhance cluster.evenly-spread-out-slots
- Change findMatchingSlot method signature for interface SlotMatchingStrategy as followed:
- SlotMatchingStrategy
<T extends TaskManagerSlotInformation> Optional<T> findMatchingSlot( ResourceProfile requestedProfile, Collection<T> freeSlots, Function<InstanceID, Integer> numberRegisteredSlotsLookup, // the function to get tasks loading for TM. Function<InstanceID, Long> slotTasksLoadingLookup);
- Change findMatchingSlot method signature for interface SlotMatchingStrategy as followed:
- override findMatchingSlot of LeastUtilizationSlotMatchingStrategy
Split the logic into two stages.
- The fist stage: keep original logic to find a set of slots whose TM slots utility is the smallest.
- The second stage: find the best slot from the set mentioned above based on the smallest tasks loading of TM
- override selectWithoutLocationPreference implemented from SlotSelectionStrategy of EvenlySpreadOutLocationPreferenceSlotSelectionStrategy like the LeastUtilizationSlotMatchingStrategy#findMatchingSlot
4 Implement task-to-slot balancing strategy for Adaptive Scheduler
- Currently, AdaptiveScheduler uses DefaultSlotAssigner to determine the mapping of tasks and slots.
- DefaultSlotAssigner is similar to LOCAL_INPUT_PREFERRED, and we can implement TaskBalancedSlotAssigner as well.
- When recovering from the checkpoint, AdaptiveScheduler uses the StateLocalitySlotAssigner
- StateLocalitySlotAssigner may have some conflicts with task balance.
- StateLocalitySlotAssigner tries to ensure that tasks are scheduled to the last running TM as much as possible.
- The task balance ensures that the number of tasks on the TM is as close as possible.
- Therefore, I want to ensure that the tasks are scheduled to the previous TM as much as possible on the premise of ensuring that the number of tasks is balanced.
5 Implement slot-to-TM balancing strategy for Adaptive Scheduler
- The current Adaptive Scheduler doesn‘t support the `cluster.evenly-spread-out-slots` strategy. If we want to reuse the `cluster.evenly-spread-out-slots` parameter, we need to support the existing `cluster.evenly-spread-out-slots` strategy for the Adaptive Scheduler.
- Then enhance the `cluster.evenly-spread-out-slots` strategy for Adaptive Scheduler to support slot loading(task number balanced).
- Waiting mechanism
- Adaptive scheduler doesn’t need to introduce the additional waiting mechanism, because adaptive scheduler starts scheduling after the resource is ready.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
By default, this feature will not be enabled, and it will not have any impact on the user unless they actively enable this parameter. This means that the user has a clear understanding of the meaning of this parameter, such as what effect they want to achieve by enabling this feature.
- If we are changing behavior how will we phase out the older behavior?
We will not eliminate the original strategy, and the new strategy introduced is at the same level as the original strategy
- If we need special migration tools, describe them here.
N.A.
- When will we remove the existing behavior?
N.A.
Test Plan
- We will focus on performance testing in three deployment modes:
- Focus on two dimensions:
- Does the number of tasks within each Slot within the Job scope meet expectations?
- Does the number of tasks in each TM within the TM range meet expectations?
- 1 Standalone session
- Single job
- Multiple jobs
- 2 Application mode
- 3 Session mode
- Single job
- Multiple jobs
- Focus on two dimensions:
Rejected Alternatives
- If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.