Discussion threadhttps://lists.apache.org/thread/mx3ot0fmk6zr02ccdby0s8oqxqm2pn1x
Vote threadhttps://lists.apache.org/thread/l4g4jnggp677hb5mxxk5rb8jc1tth2sc
JIRA

Unable to render Jira issues macro, execution error.

Release<Flink Version>

1. Background and motivation

The current strategy of Flink to deploy tasks sometimes leads some TMs have more tasks while others have fewer tasks, resulting in excessive resource utilization at some TMs that contain more tasks and becoming a bottleneck for the entire job processing. Developing strategies to achieve task load balancing for TMs and reducing job bottlenecks becomes very meaningful.

The raw design doc with discussion is recorded in [1].

1.1 Current strategy

1.1.1 Rule of slot sharing strategy when generating the mapping task to slot.

  • Different schedule-execution-vertices(SEV) of the same JobVertex(JV) cannot be distributed within the same ExecutionSlotSharingGroup(ESSG).
  • The maximum parallelism value among all JVs within a SSG range is the number of Slots for the SSG.
  • CoLocationGroup refers to a list of JobVertex instances, where the i-th subtask of one vertex has to be executed on the same TaskManager as the i-th subtask of all other JobVertex instances in the same group. 

1.1.2 Scheduling task to TM

The Flink tasks undergo two stages in deploying from JobGraph to TMs.

  • The first stage is the allocation for tasks to slots. 

The existing strategy is LocalInputPreferredSlotSharingStrategy. This strategy tries to reduce remote data exchanges.  Unfortunately, it could cause the result that SlotSharingStrategy assigns many tasks into the same one execution slot sharing group,which causes unbalanced tasks loading in slot-level.

  • The second stage is that scheduling slots to TM

Slots containing different numbers of tasks are randomly deployed to TMs. As a sequence, it causes the unbalanced tasks loading in TM-level.

1.2 General idea

So, We can ensure balanced scheduling of tasks through the following two dimensions to reduce task number skewed on TM-level:  Slot-level & TM-level. Slot-level tasks balanced loading is significant to reducing deviation for the final taskmanagers tasks loading.

How:

  1. Slot-level:We want to ensure that every slot can load best with an equal number of tasks by round-robin strategy in corresponding slot sharing groups on the premise of ensuring the above rules mentioned in 1.1
  2. TM-level:Based on slot loading balanced(1.), We want to ensure that the number of tasks per TM payload can be as close as possible and plan to allocate slots after all required slots are ready for the current job based on taskmanager tasks loading.

2. Design

2.1 Slot level tasks scheduling balance

Abbreviation of terms:

  • SlotSharingGroup: SSG
  • ExecutionSlotSharingGroup: ESSG
  • JobVertex: JV
  • Parallelism: p
  • SchedulingExecutionVertex: SEV

Note: All subsequent max parallelism refers to the maximum parallelism of multiple tasks in the SlotSharingGroup, regardless of the state's MaxParallelism.


In a SSG: 

       In the new strategy, When assigning SchedulingExecutionVertices to slots for the JobVertex whose parallelism is the max (in the current SSG),  the tasks number loading count of each slot will increase by 1.  It can’t affect the balance of each slot.  In addition, the forward edge of the node with the highest degree of parallelism can ensure local shuffle.

       As for the tasks whose parallelism is not the max, they will be assigned to each slot using the RoundRobin strategy.

2.1.1 In a SlotSharingGroup 

As mentioned in section 1.1, the rules (co-location etc.) would be ensured firstly, and then proceed with the following strategies based on round-robin strategy to approach the max balanced loading.

  • Traverse SEVs in Topological(JV-creation) order
  • eSSGIndex=-1; 
  • numberOfSlots as the max parallelism of the current SSG.
  • Set current schedulingExecutionVertex as SEV
  • Create numberOfSlots new ESSGs and set its into all positions of ESSGs to  init ESSGs
    • new ESSGs[numberOfSlots];


Core logic:

  • If the JV of the SEV doesn’t contain co-location constraint
    • If the parallelism of JV is equals to numberOfSlots
      • Traverse all SEVs of JV, assign the SEVs[subtask_index] to the ESSGs[subtask_index]This strategy ensures that SEVs with the same index can be assigned to the same ESSG. 
      • It also ensures that co-located subtasks  will be in one ESSG, given that co-located job vertices share the same parallelism and will always be in the same SSG. 
      • In the case of forward edges, all subtasks with forward shuffle are still in the same Slot, and they are local data exchange. 
    • If the parallelism of JV is less than the numberOfSlots in the current SSG.
      • Get  ESSGs[++eSSGIndex]  as ESSG.
      • Add the SEV into the target ESSG.

In short, for such tasks, using the round robin strategy for allocation can achieve an equilibrium in the number of tasks in each slot.

  • If the JV of the SEV  contains co-location constraint
    • First, try to allocate the SEV to the same co-location and indexed ESSG
    • if not found the ESSG in the first step, try to allocate the target ESSG with fewer tasks


Example: 

Assume 

JV0: parallelism=5; 

JV1: parallelism=3 ;

JV2: parallelism=5 ;

JV3: parallelism=2 ;

JV4: parallelism=1  ; 

Slot(ESSG) 0-4.

JV 

SEV index

JV

Parallel

Parallelism == numberOfSlots?

eSSGIndex(default,-1)

Operation

JV0

SEV0

5

Y

N.A

Loop: Add SEV(i) into ESSG(i)/Slot(i), 

i=0,1,2,3,4.

SEV1

SEV2

SEV3

SEV4

JV1

SEV0

3

N

++eSSGIndex%numberOfSlots=0

Add SEV0 into ESSG(0)/Slot(0)

SEV1

++eSSGIndex%numberOfSlots=1

Add SEV1 into ESSG(1)/Slot(1)

SEV2

++eSSGIndex%numberOfSlots=2

Add SEV2 into ESSG(2)/Slot(2)

JV2

SEV0

5

Y

N.A

Loop: Add SEV(i) into ESSG(i)/Slot(i), 

i=0,1,2,3,4.

SEV1

SEV2

SEV3

SEV4

JV3

SEV0

2

N

++eSSGIndex%numberOfSlots=3

Add SEV0 into ESSG(3)/Slot(3)

SEV1

++eSSGIndex%numberOfSlots=4

Add SEV1 into ESSG(4)/Slot(4)

JV4

SEV0

1

N

++eSSGIndex%numberOfSlots=0

Add SEV0 into ESSG(0)/Slot(0)


The final number of the tasks for Slots of the Example:

  • ESSG(0)/Slot(0): 4
  • ESSG(1): 3
  • ESSG(2): 3
  • ESSG(3): 3
  • ESSG(4): 3

2.1.2 In multi-SSGs

  • Use the strategy described in  2.1.1 for each SSG.

2.2 TM level tasks scheduling balance

2.2.1 Slot loading description

  • Introduce the slot loading: it means the loading for each slot.
  • Motivation & Why need it?

It’s used to avoid scheduling high loading and high loading slots into a  TM with high tasks loading.

  • How:

Introduce abstraction tasks loading on slots. Currently, this slot loading only represents the number of tasks of the current slot.


Firstly, add the load at the slot level, and the load is the number of tasks by default. Of course, the load can be another load in the future.

2.2.2 Waiting mechanism

Introducing a waiting mechanism when applying for slots to delay the allocation of slots for tasks. 

Why need the waiting mechanism?

  • In order to achieve the global optimal mapping from slot to TM, we must respect 2 conditions before scheduling slot requests to physical slots of TM:
    • Condition1: All slot requests are sent from JobMaster to SlotPool
    • Condition2: All request slots are offered from TaskManager to SlotPool
  • Why the condition1 is needed?
    • Assuming we start assign slots to tm after a part of slot requests are sent from JobMaster to SlotPool.
    • If the number of tasks on the first batch of slots is small, the number of tasks on the second batch of slots is larger.
    • All slots of first batch will be assigned to the first batch TM, and all slots of second batch will be assigned to the second batch TM.
    • The first batch of TMs run a small number of tasks, and the second batch of TMs run a large number of tasks.
    • So it's better to start assign slots to tm after all slot requests are sent from JobMaster to SlotPool.
  • Why the condition2 is needed?
    • If a TM comes, all the slots of the TM will be allocated, which is a greedy strategy. 
    • Without a global perspective, it will be difficult to achieve the global optimal mapping from slot to TM.
    • Also, all streaming jobs cannot run well if resources are insufficient. So we can start scheduling after all request slots are offered from TaskManager to SlotPool
      • 1. For streaming job with shuffle, if resources are insufficient, the job cannot run even if we schedule it.

      • 2. For streaming job without shuffle, if resources are insufficient, some of tasks(regions) can run. However, the checkpoint cannot be trigger, and job will fail after `slot.request.timeout`.

Where: The waiting mechanism would be introduced in SlotPool

  • The mapping between Slot and TM is done by SlotPool, which must wait for all slots to arrive before they can have a global perspective to process.
  • The final result of scheduling will be: there can be at most one TM that has available slots.

How

  • How to respect condition2:
    • Start assigning slots to tm after the number of available slots exceeds or equals the number of slot requests.
  • How to respect condition1:
    • The root cause of condition1 cannot be met is that the slot requests are sent from JobMaster to SlotPool is one by one instead of one whole batch.
    • The condition1 is meet in the most of scenarios, because the request for slots is faster than registering slots to the slot pool.(Registering slots to the slot pool needs to request and start TM.)
    • But there are two special cases: 
      • This is not true if a job is re-scheduled after a task failure. (Slots are ready in advance)
      • It's also not true for batch jobs. (Single task can run)
    • Solution1: Introducing an additional waiting mechanism: `slot.request.max-interval`
      • It means the maximum time interval when JM requests multiple slots from slotPool.
      • We think all slot requests are sent from JobMaster to SlotPool when `current time > arrival time of the last slotRequest + slot.request.max-interval`.
      • Of course, the `slot.request.max-interval` can be very small, like 20ms? Because the call stack is java method call, it isn't across process and it isn't rpc.
    • Solution2: Changing or adding the interface that from Scheduler to SlotPool.
      • 2.1 Changing the interface from the single slot request to batch slot request. It doesn't work well with PipelinedRegionSchedulingStrategy.
      • 2.2 Adding a interface let slot pool know all requests are arrived. (If so, the slot pool doesn't need any timeout or waiting mechanism).
    • We choose the solution1 because the solution2 may complicate the scheduling process a lot. (From this discussion).
  • Based on these information:
    • When `current time > arrival time of the last slotRequest + slot.request.max-interval`, then the SlotPool declareResourceRequirement to the RM. (For all jobs, includes batch and streaming)
    • When `the number of available slots cached exceeds or equals the pending slots`, the slots selection and allocation can be carried out. (Just for streaming job)

Limitation: The waiting mechanism(`the number of available slots cached exceeds or equals the pending slots`) does not take effect for Batch jobs, so Batch jobs do not have a global perspective to balance the number of tasks when allocating slots to TMs. There are several main reasons:

  1. Batch jobs can run normally even with only one TM
  2. Batch jobs are run in stages, and there may be only a small number of tasks in a slot at the same time.
  3. Batch jobs can use the cluster.evenly-spread-out-slots=true capability to spread slots across as many TMs as possible, especially in OLAP scenarios.
    • The option is replaced with taskmanager.load-balance.mode=Slots, For details, see Section 3.1.

2.2.3 Allocation strategy for slot to TM

When the available slots applied for can support job deploying, the waiting could be ended. After that, sort the ExecutionSlotSharingGroups(slotRequests) descending based on the slot load. And round-robin allocation for the slot whose taskmanager contains the most free slot number and the smallest tasks loading.


Example1:

  • Assume there are 6 ESSGs.

ESSG0: the number of the tasks is  4

ESSG1: the number of the tasks is  4

ESSG2: the number of the tasks is  3

ESSG3: the number of the tasks is  3

ESSG4: the number of the tasks is  3

ESSG5: the number of the tasks is  3

When the taskmanager.numberOfTaskSlots is 3, the assigned result:

TM1

TM1 task number(10)

TM2

TM2 task number(10)

ESSG0

4

ESSG1

4

ESSG2

3

ESSG3

3

ESSG4

3

ESSG5

3



When the taskmanager.numberOfTaskSlots is 2, the assigned result:

TM1

TM1 task number(7)

TM2

TM2 task number(7)

TM3

TM3 task number(6)

ESSG0

4

ESSG1

4

ESSG2

3

ESSG3

3

ESSG4

3

ESSG5

3



Example2:

  • Assume there are 7 ESSGs.

ESSG0: the number of the tasks is  4

ESSG1: the number of the tasks is  4

ESSG2: the number of the tasks is  4

ESSG3: the number of the tasks is  3

ESSG4: the number of the tasks is  3

ESSG5: the number of the tasks is  3

ESSG6: the number of the tasks is  3


When the taskmanager.numberOfTaskSlots is 3, the assigned result:

TM1

TM1 task number(10)

TM2

TM2 task number(10)

TM3

TM3 task number(4)

ESSG0

4

ESSG1

4

ESSG2

4

ESSG3

3

ESSG4

3

Free slot


ESSG5

3

ESSG5

3

Free slot




When the taskmanager.numberOfTaskSlots is 2, the assigned result:

TM1

TM1 task number(7)

TM2

TM2 task number(7)

TM3

TM3 task number(7)

TM4

TM4 task number(3)

ESSG0

4

ESSG1

4

ESSG2

4

ESSG3

3

ESSG4

3

ESSG5

3

ESSG6

3

Free slot



Note: 

  • All free slots are in the last TM, because ResourceManager doesn’t have the waiting mechanism, and it just requests 7 slots for this JobMaster.
  • Why is it acceptable?
    • If we just add the waiting mechanism to JobMaster but not in ResourceManager, all free slots will be in the last TM. All slots of other TMs are offered to JM.
    • That is, only one TM may have fewer tasks than the other TMs. The difference between the number of tasks of other TMs is at most 1.So When p >> slotsPerTM, the problem can be ignored. 
    • We can also suggest users, in cases that p is small, it's better to configure slotsPerTM to 1, or let p % slotsPerTM == 0.


From a global perspective, we use a waiting mechanism to achieve the effect of batch applying for slots, arrange the optimal deployment sequence, and then complete the deployment.


When choosing the best physical slot from the available slots set, we provided two strategies as candidates:

  • Option1: The one selection strategy is based on the number of TM tasks number loading.

To select the slot from smallest loading TM per selection.

  • Option2: The other approach is to reuse the existing cluster.evenly-spread-out-slots strategies by enhancing it.
    • Solution: Prioritize scheduling to TM with lower slots usage, and select TM with fewer tasks when slots usage is the same.
    • In this strategy, because we have achieved relative balance in the task sharing slot allocation stage, we can also achieve balanced results by enhancing this strategy.


Comparision:

  • Option2 Benefit: Reduce parameters for users, user-friendly
    • Functional non conflict: We also need slot balancing.
  • Option1: 
    • We need to introduce a parameter at the same level as  'cluster.evenly-spread-out-slots
    • We need to handle conflicts where two parameters are simultaneously enabled
    • If we need to introduce a new configuration type, then we need to be compatible with the obsolete parameter 'cluster.evenly-spread-out-slots
    • We should consider the new parameters. Maybe additional parameters will be required.
    • For the cases mentioned above, we need to change the parameter type of 'cluster.evenly-spread-out-slots'  and the new parameter to enum type and ensure the compatibility.
  • Preferred conclusion:

We prefer the option2, the rest of the document is designed by reusing the option in the future.

2.2.4 Note

We have come to a preliminary conclusion that if we cannot obtain global resource information at once, it seems difficult to ensure the most balanced global effect on the TM side. That’s why introducing a waiting mechanism that starts assigning physical slots after all slots are ready.

If based on a global perspective, we can follow the mentioned above in 2.2.2.

It is worth mentioning that when using declarative mode for resource management in application mode, the number of slots for each TM is fixed, and under this premise, it is also easier to obtain a nearly global optimal solution.

3 Proposed change

3.1 Public Interfaces

Introduce the following configuration items:

    • taskmanager.load-balance.mode
      • This option controls 2 phases during scheduling
        • Task → Slot  has 2 strategies: LOCAL_INPUT_PREFERRED and TASK_BALANCED_PREFERRED
        • Slot → TM has 2 strategies: cluster.evenly-spread-out-slots is true or false
      • Value:
        • None: LOCAL_INPUT_PREFERRED + cluster.evenly-spread-out-slots=false (It's the default behavior of old flink version)
        • Slots: LOCAL_INPUT_PREFERRED +  cluster.evenly-spread-out-slots=true (It is used to be compatible with the old cluster.evenly-spread-out-slots option)
        • Tasks: TASK_BALANCED_PREFERRED + cluster.evenly-spread-out-slots=true (It's the new tasks’ balanced scheduling)
      • type: TaskManagerLoadBalanceMode
      • default: None
    • slot.request.max-interval
      • type: Duration
      • default: 20ms
      • description: The maximum interval duration between JobMaster requesting slots from the SlotPool.
    • Deprecate cluster.evenly-spread-out-slots
      • It can be replaced with the new taskmanager.load-balance.mode option.

3.2 Non-Public Change

3.2.1 Add the loading abstraction to various levels of slots 

  • Introduce the abstraction and the interface
    • LoadingWeight

      LoadingWeight
      public interface LoadingWeight {
          float getLoading();
          LoadingWeight merge(LoadingWeight other);
      }
    • WeightLoadable

      WeightLoadable
      public interface WeightLoadable {
          LoadingWeight getLoading();
          void setLoading(@Nonnull LoadingWeight loadingWeight);
      }
  • Classes need to implement or extend  WeightLoadable
    • ExecutionSlotSharingGroup
    • PhysicalSlotRequest
    • PendingRequest
    • SlotInfo
      • SlotContext
        • PhysicalSlot
          • AllocatedSlot
    • TaskManagerSlotInformation
      • DeclarativeTaskManagerSlot
      • FineGrainedTaskManagerSlot
  • Other classes need to add LoadingWeights
    • ResourceRequirement

List<LoadingWeight> loadingWeights;

    • ResourceCounter

Map<ResourceProfile, List<LoadingWeight>> loadingWeights;

3.2.2 Implement task-to-slot balancing strategy for Default Scheduler

  • Introduce  BalancedPreferredSlotSharingStrategy
  • BalancedPreferredSlotSharingStrategy
    • implements SlotSharingStrategy, SchedulingTopologyListener
    • responsibility:Supply the  Task-> Slot(ESSG) balanced mapping
    • algorithm:Refer 2.1.2
    • Other Classes
      • BalancedPreferredSlotSharingStrategy.ExecutionSlotSharingGroupBuilder
      • BalancedPreferredSlotSharingStrategy.Factory 
        • implements SlotSharingStrategy.Factory
      • BalancedSlotSharingExecutionSlotAllocatorFactory 
        • implements ExecutionSlotAllocatorFactory

3.2.3 Implement slots to TMs balancing for DefaultScheduler

  • Waiting mechanism
    • Adapt the implemented classes of RequestSlotMatchingStrategy

Use the original information and the LoadingWeight to match target PendingRequests.

      • SimpleRequestSlotMatchingStrategy
      • PreferredAllocationRequestSlotMatchingStrategy
    • Introduce a new implementation of SlotPool
      • Introduce a availableSlots Map in type HashMap<AllocationID, PhysicalSlot>. When calling newSlotsAreAvailable, cache the slots into availableSlots.
      • Introduce the logic to process the parameter `slot.request.max-interval`, which means the maximum time interval when JM requests multiple slots from SlotPool.
      • When `current time > arrival time of the last slotRequest + slot.request.max-interval`, then the SlotPool declareResourceRequirement to the RM.
      • When `the number of available slots cached exceeds or equals the pending slots`, the slots selection and allocation can be carried out.
  • Enhance the old logic related to cluster.evenly-spread-out-slots
    • Change findMatchingSlot method signature for interface SlotMatchingStrategy as followed:
      • SlotMatchingStrategy
        <T extends TaskManagerSlotInformation> Optional<T> findMatchingSlot(
                    ResourceProfile requestedProfile,
                    Collection<T> freeSlots,
                    Function<InstanceID, Integer> numberRegisteredSlotsLookup,
                    // the function to get tasks loading for TM.
                    Function<InstanceID, Long> slotTasksLoadingLookup);
    • override findMatchingSlot of LeastUtilizationSlotMatchingStrategy 

Split the logic into two stages.

      • The fist stage: keep original logic to find a set of slots whose TM slots utility is the smallest.
      • The second stage: find the best slot from the set mentioned above based on the smallest tasks loading of TM
    • override selectWithoutLocationPreference implemented from SlotSelectionStrategy  of EvenlySpreadOutLocationPreferenceSlotSelectionStrategy like the LeastUtilizationSlotMatchingStrategy#findMatchingSlot

3.2.4 Implement task-to-slot balancing strategy for Adaptive Scheduler

  • Currently, AdaptiveScheduler uses DefaultSlotAssigner to determine the mapping of tasks and slots.
    • DefaultSlotAssigner is similar to LOCAL_INPUT_PREFERRED, and we can implement TaskBalancedSlotAssigner as well.
  • When recovering from the checkpoint, AdaptiveScheduler uses the StateLocalitySlotAssigner
    • StateLocalitySlotAssigner may have some conflicts with task balance.
    • StateLocalitySlotAssigner tries to ensure that tasks are scheduled to the last running TM as much as possible.
    • The task balance ensures that the number of tasks on the TM is as close as possible.
    • Therefore, I want to ensure that the tasks are scheduled to the previous TM as much as possible on the premise of ensuring that the number of tasks is balanced.

3.2.5 Implement slot-to-TM balancing strategy for Adaptive Scheduler

  • The current Adaptive Scheduler doesn‘t support the `cluster.evenly-spread-out-slots` strategy. If we want to reuse the `cluster.evenly-spread-out-slots` parameter, we need to support the existing `cluster.evenly-spread-out-slots` strategy for the Adaptive Scheduler.
  • Then enhance the `cluster.evenly-spread-out-slots` strategy for Adaptive Scheduler to support slot loading(task number balanced).
  • Waiting mechanism
    • Adaptive scheduler doesn’t need to introduce the additional waiting mechanism, because adaptive scheduler starts scheduling after the resource is ready.


4 Benefits

4.1 Jobs that perform well

4.1.1 The job1

  • DAG
    • Source1: p=10
    • Sink: p=20
  • Slot per tm: 2
  • Metrics:
    • LocalInput_Preferred
      • TM Task loading
      • cpu avg & cpu max 
    • Balanced_Preferred
      • TM Task loading
      • cpu avg & cpu max 
  • Benefit table


Benefits fields(TM) \ Scheduling Strategy

LocalInputPreferred

BalancedPreferred

Required VCore (Based on max vcore usage)

Each TM needs to apply for 5 VCores

Each TM just apply for 2.6 VCores

Task number diff

Tm Task number quantity difference is 2

Tm Task number quantity difference is 0

CPU usage multiplier factor

(max/avg)

TM CPU usage multiplier value is 2, (5.09/2.56)

TM CPU usage multiplier value reduced from 2 to 1 (more balanced)



4.1.2 The job 2

  • DAG
    • Source1: p=10
    • Source2 :   p=10
    • Sink:p=30
  • Slot per tm: 3
  • Metrics
    • LocalInput_Preferred
      • TM Task loading
      • cpu avg & cpu max
        •  
    • Balanced_Preferred
      • TM Task loading
      • cpu avg & cpu max 
  • Benefit table


Benefits fields(TM) \ Scheduling Strategy

LocalInputPreferred

BalancedPreferred

Required VCore (Based on max vcore usage)

Each TM needs to apply for 5 VCores

Each TM just apply for 3 VCores

Task loading diff

Tm Task load  number quantity difference is 4

Tm Task number quantity difference is 0

CPU usage multiplier factor

(max/avg)

TM CPU usage multiplier value is 2

TM CPU usage multiplier value reduced from 2 to 1 (more balanced)


4.2 The job with complex DAG

4.2.1 The Job 1

  • DAG
  • Compared-Metrics
  • Benefit table


Benefits fields(TM) \ Scheduling Strategy

LocalInputPreferred

BalancedPreferred

Required VCore (Based on max vcore usage)

Each TM needs to apply for 3 VCores

Each TM just apply for 2 VCores

Task number diff

Tm Task load number quantity difference is 12

Tm Task number quantity difference is 1

CPU usage multiplier factor

(max/avg)

TM CPU usage multiplier value is 4.154

TM CPU usage multiplier value reduced from 4.154 to 2.09 (more balanced)


4.2.2 The Job 2

  • Compared-Metrics
  • Benefit table


Benefits fields(TM) \ Scheduling Strategy

LocalInputPreferred

BalancedPreferred

Required VCore (Based on max vcore usage)

Each TM needs to apply for 3 VCores

Each TM just apply for 1.9 VCores

Task number diff

Tm Task load number quantity difference is greater than 10

Tm Task number quantity difference is 1

CPU usage multiplier factor

(max/avg)

TM CPU usage multiplier value is 3.6

TM CPU usage multiplier value reduced from 3.6 to 2.07 (more balanced)


The jobs with complex DAG cannot achieve absolute balance of CPU usage on TM level due to different loads between tasks. But from the results, there are also benefits.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

By default, this feature will not be enabled, and it will not have any impact on the user unless they actively enable this parameter. This means that the user has a clear understanding of the meaning of this parameter, such as what effect they want to achieve by enabling this feature.

  • If we are changing behavior how will we phase out the older behavior?

We will not eliminate the original strategy, and the new strategy introduced is at the same level as the original strategy

  • If we need special migration tools, describe them here.

N.A.

  • When will we remove the existing behavior?

N.A.

Test Plan

  • We will focus on performance testing in three deployment modes:
    • Focus on two dimensions:
      • Does the number of tasks within each Slot within the Job scope meet expectations?
      • Does the number of tasks in each TM within the TM range meet expectations?
    • 1 Standalone session
      • Single job
      • Multiple jobs
    • 2 Application mode
    • 3 Session mode
      • Single job
      • Multiple jobs

Rejected Alternatives

  • Expose two options to the user
  • Change the strategy of slot.request.max-interval
    • The old strategy is: when `current time > arrival time of the last slotRequest + slot.request.max-interval` and `the number of available slots cached exceeds or equals the pending slots`, the slots selection and allocation can be carried out.
    • The latest strategy is:
      • For DeclarativeSlotPool#increaseResourceRequirementsBy, When `current time > arrival time of the last slotRequest + slot.request.max-interval`, then the DeclarativeSlotPool declareResourceRequirement to the RM.
      • For DeclarativeSlotPool#decreaseResourceRequirementsByWhen `current time > arrival time of the last slotRequest + slot.request.max-interval`, then the DeclarativeSlotPool declareResourceRequirement to the RM.
      • For DeclarativeSlotPoolBridge, When `the number of available slots cached exceeds or equals the pending slots`, the slots selection and allocation can be carried out.
    • This new strategy not only implements the functionality of the old strategy, but also reduces the number of rpcs from SlotPool to ResourceManager.


References

[1] https://docs.google.com/document/d/14WhrSNGBdcsRl3IK7CZO-RaZ5KXU2X1dWqxPEFr3iS8

  • No labels