Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties


Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)thread/mx3ot0fmk6zr02ccdby0s8oqxqm2pn1x
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)/thread/l4g4jnggp677hb5mxxk5rb8jc1tth2sc
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-31757

Release<Flink Version>


...

The current strategy of Flink to deploy tasks sometimes leads some TMs have more tasks while others have fewer tasks, resulting in excessive resource utilization at some TMs that contain more tasks and becoming a bottleneck for the entire job processing. Developing strategies to achieve task load balancing for TMs and reducing job bottlenecks becomes very meaningful.

The raw design doc with discussion is recorded in [1].

1.1 Current strategy

1.1.1 Rule of slot sharing strategy when generating the mapping task to slot.

...

  • If the JV of the SEV doesn’t contain co-location constraint
    • If the parallelism of JV is equals to numberOfSlots
      • Traverse all SEVs of JV, assign the SEVs[subtask_index] to the ESSGs[subtask_index]This strategy ensures that SEVs with the same index can be assigned to the same ESSG. 
      • It also ensures that co-located subtasks  will be in one ESSG, given that co-located job vertices share the same parallelism and will always be in the same SSG. 
      • In the case of forward edges, remote data exchange of such JVs can be reducedall subtasks with forward shuffle are still in the same Slot, and they are local data exchange
    • If the parallelism of JV is less than the numberOfSlots in the current SSG.
      • Get  ESSGs[++eSSGIndex]  as ESSG.
      • Add the SEV into the target ESSG.

...

Why need the waiting mechanism?

  • If a TM comes, all the slots of the TM will be allocated, which is a greedy strategy. 
  • Without a global perspective, it will be difficult to achieve the global optimal mapping from slot to TM.

Where: The waiting mechanism would be introduced in SlotPool

  • The mapping between Slot and TM is done by SlotPool, which must wait for all slots to arrive before they can have a global perspective to process.
  • The final result of scheduling will be: there can be at most one TM that has available slots.

How

  • Usually, we think that the request for slots is faster than registering slots to the slot pool.(Registering slots to the slot pool needs to request and start TM.) But there are two special cases: 
    • This is not true if a job is re-scheduled after a task failure.
    • It's also not true for batch jobs.

For these two cases, we could introduce an additional timeout option: `slot.request.max-interval`, which means the maximum time interval when JM requests multiple slots from SlotPool. When `current time > arrival time of the last slotRequest + slot.request.max-interval` and

           the number of available slots cached exceeds or equals the pending slots, the slots selection and allocation can be carried out.

  • So, we will first cache the available slots. When the number of available slots cached exceeds or equals the pending slots, the slots selection and allocation can be carried out.
  • About timeout: Slot pool does not require the introduction of any timeout mechanism and relies on the existing `slot.request.timeout`.

The  scheduling is meaningless when the  resources are insufficient.

2.2.3 Allocation strategy for slot to TM

When the available slots applied for can support job deploying, the waiting could be ended. After that, sort the ExecutionSlotSharingGroups(slotRequests) descending based on the slot load. And round-robin allocation for the slot whose taskmanager contains the most free slot number and the smallest tasks loading.

Example1:

  • Assume there are 6 ESSGs.

ESSG0: the number of the tasks is  4

ESSG1: the number of the tasks is  4

ESSG2: the number of the tasks is  3

ESSG3: the number of the tasks is  3

ESSG4: the number of the tasks is  3

ESSG5: the number of the tasks is  3

When the taskmanager.numberOfTaskSlots is 3, the assigned result:

TM1

TM1 task number(10)

TM2

TM2 task number(10)

ESSG0

4

ESSG1

4

ESSG2

3

ESSG3

3

ESSG4

3

ESSG5

3

When the taskmanager.numberOfTaskSlots is 2, the assigned result:

TM1

TM1 task number(7)

TM2

TM2 task number(7)

TM3

TM3 task number(6)

ESSG0

4

ESSG1

4

ESSG2

3

ESSG3

3

ESSG4

3

ESSG5

3

Example2:

  • Assume there are 7 ESSGs.

ESSG0: the number of the tasks is  4

ESSG1: the number of the tasks is  4

ESSG2: the number of the tasks is  4

ESSG3: the number of the tasks is  3

ESSG4: the number of the tasks is  3

ESSG5: the number of the tasks is  3

ESSG6: the number of the tasks is  3

When the taskmanager.numberOfTaskSlots is 3, the assigned result:

...

TM1

...

TM1 task number(10)

...

TM2

...

TM2 task number(10)

...

TM3

...

TM3 task number(4)

...

ESSG0

...

4

...

ESSG1

...

4

...

ESSG2

...

4

  • In order to achieve the global optimal mapping from slot to TM, we must respect 2 conditions before scheduling slot requests to physical slots of TM:
    • Condition1: All slot requests are sent from JobMaster to SlotPool
    • Condition2: All request slots are offered from TaskManager to SlotPool
  • Why the condition1 is needed?
    • Assuming we start assign slots to tm after a part of slot requests are sent from JobMaster to SlotPool.
    • If the number of tasks on the first batch of slots is small, the number of tasks on the second batch of slots is larger.
    • All slots of first batch will be assigned to the first batch TM, and all slots of second batch will be assigned to the second batch TM.
    • The first batch of TMs run a small number of tasks, and the second batch of TMs run a large number of tasks.
    • So it's better to start assign slots to tm after all slot requests are sent from JobMaster to SlotPool.
  • Why the condition2 is needed?
    • If a TM comes, all the slots of the TM will be allocated, which is a greedy strategy. 
    • Without a global perspective, it will be difficult to achieve the global optimal mapping from slot to TM.
    • Also, all streaming jobs cannot run well if resources are insufficient. So we can start scheduling after all request slots are offered from TaskManager to SlotPool
      • 1. For streaming job with shuffle, if resources are insufficient, the job cannot run even if we schedule it.

      • 2. For streaming job without shuffle, if resources are insufficient, some of tasks(regions) can run. However, the checkpoint cannot be trigger, and job will fail after `slot.request.timeout`.

Where: The waiting mechanism would be introduced in SlotPool

  • The mapping between Slot and TM is done by SlotPool, which must wait for all slots to arrive before they can have a global perspective to process.
  • The final result of scheduling will be: there can be at most one TM that has available slots.

How

  • How to respect condition2:
    • Start assigning slots to tm after the number of available slots exceeds or equals the number of slot requests.
  • How to respect condition1:
    • The root cause of condition1 cannot be met is that the slot requests are sent from JobMaster to SlotPool is one by one instead of one whole batch.
    • The condition1 is meet in the most of scenarios, because the request for slots is faster than registering slots to the slot pool.(Registering slots to the slot pool needs to request and start TM.)
    • But there are two special cases: 
      • This is not true if a job is re-scheduled after a task failure. (Slots are ready in advance)
      • It's also not true for batch jobs. (Single task can run)
    • Solution1: Introducing an additional waiting mechanism: `slot.request.max-interval`
      • It means the maximum time interval when JM requests multiple slots from slotPool.
      • We think all slot requests are sent from JobMaster to SlotPool when `current time > arrival time of the last slotRequest + slot.request.max-interval`.
      • Of course, the `slot.request.max-interval` can be very small, like 20ms? Because the call stack is java method call, it isn't across process and it isn't rpc.
    • Solution2: Changing or adding the interface that from Scheduler to SlotPool.
      • 2.1 Changing the interface from the single slot request to batch slot request. It doesn't work well with PipelinedRegionSchedulingStrategy.
      • 2.2 Adding a interface let slot pool know all requests are arrived. (If so, the slot pool doesn't need any timeout or waiting mechanism).
    • We choose the solution1 because the solution2 may complicate the scheduling process a lot. (From this discussion).
  • Based on these information:
    • When `current time > arrival time of the last slotRequest + slot.request.max-interval`, then the SlotPool declareResourceRequirement to the RM. (For all jobs, includes batch and streaming)
    • When `the number of available slots cached exceeds or equals the pending slots`, the slots selection and allocation can be carried out. (Just for streaming job)

Limitation: The waiting mechanism(`the number of available slots cached exceeds or equals the pending slots`) does not take effect for Batch jobs, so Batch jobs do not have a global perspective to balance the number of tasks when allocating slots to TMs. There are several main reasons:

  1. Batch jobs can run normally even with only one TM
  2. Batch jobs are run in stages, and there may be only a small number of tasks in a slot at the same time.
  3. Batch jobs can use the cluster.evenly-spread-out-slots=true capability to spread slots across as many TMs as possible, especially in OLAP scenarios.
    • The option is replaced with taskmanager.load-balance.mode=Slots, For details, see Section 3.1.

2.2.3 Allocation strategy for slot to TM

When the available slots applied for can support job deploying, the waiting could be ended. After that, sort the ExecutionSlotSharingGroups(slotRequests) descending based on the slot load. And round-robin allocation for the slot whose taskmanager contains the most free slot number and the smallest tasks loading.


Example1:

  • Assume there are 6 ESSGs.

ESSG0: the number of the tasks is  4

ESSG1: the number of the tasks is  4

ESSG2: the number of the tasks is  3

ESSG3: the number of the tasks is  3

ESSG4: the number of the tasks is  3

ESSG5: the number of the tasks is  3

When the taskmanager.numberOfTaskSlots is 3

...

ESSG3

...

3

...

ESSG4

...

3

...

Free slot

...

ESSG5

...

3

...

ESSG5

...

3

...

Free slot

When the taskmanager.numberOfTaskSlots is 2, the assigned result:

TM1

TM1 task number(710)

TM2

TM2 task number(710)

ESSG0

4

ESSG1

4

ESSG2

3

ESSG3

3

ESSG4

3

ESSG5

3



When the taskmanager.numberOfTaskSlots is 2, the assigned result:

TM1

TM1 task number(7)

TM2

TM2 task number(7)

TM3

TM3 task number(6

TM3

TM3 task number(7)

TM4

TM4 task number(3)

ESSG0

4

ESSG1

4

ESSG2

43

ESSG3

3

ESSG4

3

ESSG5

3

ESSG6

3

Free slot

Note: 

  • All free slots are in the last TM, because ResourceManager doesn’t have the waiting mechanism, and it just requests 7 slots for this JobMaster.
  • Why is it acceptable?
    • If we just add the waiting mechanism to JobMaster but not in ResourceManager, all free slots will be in the last TM. All slots of other TMs are offered to JM.
    • That is, only one TM may have fewer tasks than the other TMs. The difference between the number of tasks of other TMs is at most 1.So When p >> slotsPerTM, the problem can be ignored. 
    • We can also suggest users, in cases that p is small, it's better to configure slotsPerTM to 1, or let p % slotsPerTM == 0.

From a global perspective, we use a waiting mechanism to achieve the effect of batch applying for slots, arrange the optimal deployment sequence, and then complete the deployment.

When choosing the best physical slot from the available slots set, we provided two strategies as candidates:

  • Option1: The one selection strategy is based on the number of TM tasks number loading.

To select the slot from smallest loading TM per selection.

  • Option2: The other approach is to reuse the existing cluster.evenly-spread-out-slots strategies by enhancing it.
    • Solution: Prioritize scheduling to TM with lower slots usage, and select TM with fewer tasks when slots usage is the same.
    • In this strategy, because we have achieved relative balance in the task sharing slot allocation stage, we can also achieve balanced results by enhancing this strategy.

Comparision:

  • Option2 Benefit: Reduce parameters for users, user-friendly
    • Functional non conflict: We also need slot balancing.
  • Option1: 
    • We need to introduce a parameter at the same level as  'cluster.evenly-spread-out-slots
    • We need to handle conflicts where two parameters are simultaneously enabled
    • If we need to introduce a new configuration type, then we need to be compatible with the obsolete parameter 'cluster.evenly-spread-out-slots
    • We should consider the new parameters. Maybe additional parameters will be required.
    • For the cases mentioned above, we need to change the parameter type of 'cluster.evenly-spread-out-slots'  and the new parameter to enum type and ensure the compatibility.
  • Preferred conclusion:

We prefer to reuse and enhance `cluster.evenly-spread-out-slots`. The rest of the document is designed by reusing the option in the future.

2.2.4 Note

We have come to a preliminary conclusion that if we cannot obtain global resource information at once, it seems difficult to ensure the most balanced global effect on the TM side. That’s why introducing a waiting mechanism that starts assigning physical slots after all slots are ready.

If based on a global perspective, we can follow the mentioned above in 2.2.2.

It is worth mentioning that when using declarative mode for resource management in application mode, the number of slots for each TM is fixed, and under this premise, it is also easier to obtain a nearly global optimal solution.

3 Proposed change

3.1 Public Interfaces

Introduce the following configuration items:

    • slot.sharing-strategy
      • Value:
        • TASK_BALANCED_PREFERRED: It represents the new tasks’ balanced scheduling.
          • Introduce an Enum named TaskSlotSharingStrategy.
        • LOCAL_INPUT_PREFERRED:It represents the tasks’ local input preferred scheduling
      • type: TaskSlotSharingStrategy
      • default: LOCAL_INPUT_PREFERRED
    • slot.request.max-interval
      • type: Duration
      • default: 50ms
      • description: The max interval duration for requesting a slot from SlotPool.

3.2 Non-Public Change

3.2.1 Add the loading abstraction to various levels of slots 

...

LoadingWeight

Code Block
languagejava
titleLoadingWeight
linenumberstrue
public interface LoadingWeight {
    float getLoading();
    LoadingWeight merge(LoadingWeight other);
}

WeightLoadable

Code Block
languagejava
titleLoadingWeight
linenumberstrue
public interface WeightLoadable {
    LoadingWeight getLoading();
    void setLoading(@Nonnull LoadingWeight loadingWeight);
}



Example2:

  • Assume there are 7 ESSGs.

ESSG0: the number of the tasks is  4

ESSG1: the number of the tasks is  4

ESSG2: the number of the tasks is  4

ESSG3: the number of the tasks is  3

ESSG4: the number of the tasks is  3

ESSG5: the number of the tasks is  3

ESSG6: the number of the tasks is  3


When the taskmanager.numberOfTaskSlots is 3, the assigned result:

TM1

TM1 task number(10)

TM2

TM2 task number(10)

TM3

TM3 task number(4)

ESSG0

4

ESSG1

4

ESSG2

4

ESSG3

3

ESSG4

3

Free slot


ESSG5

3

ESSG5

3

Free slot




When the taskmanager.numberOfTaskSlots is 2, the assigned result:

TM1

TM1 task number(7)

TM2

TM2 task number(7)

TM3

TM3 task number(7)

TM4

TM4 task number(3)

ESSG0

4

ESSG1

4

ESSG2

4

ESSG3

3

ESSG4

3

ESSG5

3

ESSG6

3

Free slot



Note: 

  • All free slots are in the last TM, because ResourceManager doesn’t have the waiting mechanism, and it just requests 7 slots for this JobMaster.
  • Why is it acceptable?
    • If we just add the waiting mechanism to JobMaster but not in ResourceManager, all free slots will be in the last TM. All slots of other TMs are offered to JM.
    • That is, only one TM may have fewer tasks than the other TMs. The difference between the number of tasks of other TMs is at most 1.So When p >> slotsPerTM, the problem can be ignored. 
    • We can also suggest users, in cases that p is small, it's better to configure slotsPerTM to 1, or let p % slotsPerTM == 0.


From a global perspective, we use a waiting mechanism to achieve the effect of batch applying for slots, arrange the optimal deployment sequence, and then complete the deployment.


When choosing the best physical slot from the available slots set, we provided two strategies as candidates:

  • Option1: The one selection strategy is based on the number of TM tasks number loading.

To select the slot from smallest loading TM per selection.

  • Option2: The other approach is to reuse the existing cluster.evenly-spread-out-slots strategies by enhancing it.
    • Solution: Prioritize scheduling to TM with lower slots usage, and select TM with fewer tasks when slots usage is the same.
    • In this strategy, because we have achieved relative balance in the task sharing slot allocation stage, we can also achieve balanced results by enhancing this strategy.


Comparision:

  • Option2 Benefit: Reduce parameters for users, user-friendly
    • Functional non conflict: We also need slot balancing.
  • Option1: 
    • We need to introduce a parameter at the same level as  'cluster.evenly-spread-out-slots
    • We need to handle conflicts where two parameters are simultaneously enabled
    • If we need to introduce a new configuration type, then we need to be compatible with the obsolete parameter 'cluster.evenly-spread-out-slots
    • We should consider the new parameters. Maybe additional parameters will be required.
    • For the cases mentioned above, we need to change the parameter type of 'cluster.evenly-spread-out-slots'  and the new parameter to enum type and ensure the compatibility.
  • Preferred conclusion:

We prefer the option2, the rest of the document is designed by reusing the option in the future.

2.2.4 Note

We have come to a preliminary conclusion that if we cannot obtain global resource information at once, it seems difficult to ensure the most balanced global effect on the TM side. That’s why introducing a waiting mechanism that starts assigning physical slots after all slots are ready.

If based on a global perspective, we can follow the mentioned above in 2.2.2.

It is worth mentioning that when using declarative mode for resource management in application mode, the number of slots for each TM is fixed, and under this premise, it is also easier to obtain a nearly global optimal solution.

3 Proposed change

3.1 Public Interfaces

Introduce the following configuration items:

    • taskmanager.load-balance.mode
      • This option controls 2 phases during scheduling
        • Task → Slot  has 2 strategies: LOCAL_INPUT_PREFERRED and TASK_BALANCED_PREFERRED
        • Slot → TM has 2 strategies: cluster.evenly-spread-out-slots is true or false
      • Value:
        • None: LOCAL_INPUT_PREFERRED + cluster.evenly-spread-out-slots=false (It's the default behavior of old flink version)
        • Slots: LOCAL_INPUT_PREFERRED +  cluster.evenly-spread-out-slots=true (It is used to be compatible with the old cluster.evenly-spread-out-slots option)
        • Tasks: TASK_BALANCED_PREFERRED + cluster.evenly-spread-out-slots=true (It's the new tasks’ balanced scheduling)
      • type: TaskManagerLoadBalanceMode
      • default: None
    • slot.request.max-interval
      • type: Duration
      • default: 20ms
      • description: The maximum interval duration between JobMaster requesting slots from the SlotPool.
    • Deprecate cluster.evenly-spread-out-slots
      • It can be replaced with the new taskmanager.load-balance.mode option.

3.2 Non-Public Change

3.2.1 Add the loading abstraction to various levels of slots 

  • Introduce the abstraction and the interface
    • LoadingWeight

      Code Block
      languagejava
      titleLoadingWeight
      linenumberstrue
      public interface LoadingWeight {
          float getLoading();
          LoadingWeight merge(LoadingWeight other);
      }


    • WeightLoadable

      Code Block
      languagejava
      titleWeightLoadable
      linenumberstrue
      public interface WeightLoadable {
          LoadingWeight getLoading();
          void setLoading(@Nonnull LoadingWeight loadingWeight);
      }


  • Classes need to implement or extend  WeightLoadable
    • ExecutionSlotSharingGroup
    • PhysicalSlotRequest
    • PendingRequest
    • SlotInfo
      • SlotContext
        • PhysicalSlot
          • AllocatedSlot
    • TaskManagerSlotInformation
      • DeclarativeTaskManagerSlot
      • FineGrainedTaskManagerSlot
  • Other classes need to add LoadingWeights
    • ResourceRequirement

List<LoadingWeight> loadingWeights;

    • ResourceCounter

Map<ResourceProfile, List<LoadingWeight>> loadingWeights;

3.2.2 Implement task-to-slot balancing strategy for Default Scheduler

  • Introduce  BalancedPreferredSlotSharingStrategy
  • BalancedPreferredSlotSharingStrategy
    • implements SlotSharingStrategy, SchedulingTopologyListener
    • responsibility:Supply the  Task-> Slot(ESSG) balanced mapping
    • algorithm:Refer 2.1.2
    • Other Classes
      • BalancedPreferredSlotSharingStrategy.ExecutionSlotSharingGroupBuilder
      • BalancedPreferredSlotSharingStrategy.Factory 
        • implements SlotSharingStrategy.Factory
      • BalancedSlotSharingExecutionSlotAllocatorFactory 
        • implements ExecutionSlotAllocatorFactory

3.2.3 Implement slots to TMs balancing for DefaultScheduler

  • Waiting mechanism
    • Adapt the implemented classes of RequestSlotMatchingStrategy

Use the original information and the

  • Classes need to implement or extend  WeightLoadable
    • ExecutionSlotSharingGroup
    • PhysicalSlotRequest
    • PendingRequest
    • SlotInfo
      • SlotContext
        • PhysicalSlot
          • AllocatedSlot
    • TaskManagerSlotInformation
      • DeclarativeTaskManagerSlot
      • FineGrainedTaskManagerSlot
  • Other classes need to add LoadingWeights
    • ResourceRequirement

List<LoadingWeight> loadingWeights;

    • ResourceCounter

Map<ResourceProfile, List<LoadingWeight>> loadingWeights;

3.2.2 Implement task-to-slot balancing strategy for Default Scheduler

  • Introduce  BalancedPreferredSlotSharingStrategy
  • BalancedPreferredSlotSharingStrategy
    • implements SlotSharingStrategy, SchedulingTopologyListener
    • responsibility:Supply the  Task-> Slot(ESSG) balanced mapping
    • algorithm:Refer 2.1.2
    • Other Classes
      • BalancedPreferredSlotSharingStrategy.ExecutionSlotSharingGroupBuilder
      • BalancedPreferredSlotSharingStrategy.Factory 
        • implements SlotSharingStrategy.Factory
      • BalancedSlotSharingExecutionSlotAllocatorFactory 
        • implements ExecutionSlotAllocatorFactory

3.2.3 Implement slots to TMs balancing for DefaultScheduler

  • Waiting mechanism
    • Adapt the implemented classes of RequestSlotMatchingStrategy

Use the original information and theLoadingWeight to match target PendingRequests.

...

    • Introduce a new implementation of SlotPool
      • Introduce a availableSlots Map in type HashMap<AllocationID, PhysicalSlot>When . When calling newSlotsAreAvailable, cache the slots into availableSlotsif the size of the availableSlots is greater than or equals to the size of pendingRequests andthe availableSlots could be be matched with all pendingRequests:

...

      • availableSlots.
        • If not: do nothing.
      • Introduce the logic to process the parameter `slot.request.max-interval`, which means the maximum time interval when JM requests multiple slots from SlotPool.
      • When `current time > arrival time of the last slotRequest + slot.request.max-interval`, then the SlotPool declareResourceRequirement to the RM.
      • When ` and the number of available slots cached exceeds or equals the pending slotsslots`, the slots selection and allocation can be carried out.
  • Enhance the old logic related to cluster.evenly-spread-out-slots
    • Change findMatchingSlot method signature for interfaceSlotMatchingStrategy as followed:

      • Code Block
        languagejava
        titleSlotMatchingStrategy
        linenumberstrue
        <T extends TaskManagerSlotInformation> Optional<T> findMatchingSlot(
                    ResourceProfile requestedProfile,
                    Collection<T> freeSlots,
                    Function<InstanceID, Integer> numberRegisteredSlotsLookup,
                    // the function to get tasks loading for TM.
                    Function<InstanceID, Long> slotTasksLoadingLookup);


...

  • The current Adaptive Scheduler doesn‘t support the `cluster.evenly-spread-out-slots` strategy. If we want to reuse the `cluster.evenly-spread-out-slots` parameter, we need to support the existing `cluster.evenly-spread-out-slots` strategy for the Adaptive Scheduler.
  • Then enhance the `cluster.evenly-spread-out-slots` strategy for Adaptive Scheduler to support slot loading(task number balanced).
  • Waiting mechanism
    • Adaptive scheduler doesn’t need to introduce the additional waiting mechanism, because adaptive scheduler starts scheduling after the resource is ready.

4 Benefits

4.1 Jobs that perform well

...

  • loading(task number balanced).
  • Waiting mechanism
    • Adaptive scheduler doesn’t need to introduce the additional waiting mechanism, because adaptive scheduler starts scheduling after the resource is ready.


4 Benefits

4.1 Jobs that perform well

4.1.1 The job1

  • DAG
    • Source1: p=10
    • Sink: p=20
  • Slot per tm: 2
  • Metrics:
    • LocalInput_Preferred
      • TM Task loading
        • Image Added
      • cpu avg & cpu max 
        • Image Added
    • Balanced_Preferred
      • TM Task loading
        • Image Added
      • cpu avg & cpu max 
        • Image Added
  • Benefit table


Benefits fields(TM) \ Scheduling Strategy

LocalInputPreferred

BalancedPreferred

Required VCore (Based on max vcore usage)

Each TM needs to apply for 5 VCores

Each TM just apply for 2.6 VCores

Task number diff

Tm Task number quantity difference is 2

Tm Task number quantity difference is 0

CPU usage multiplier factor

(max/avg)

TM CPU usage multiplier value is 2, (5.09/2.56)

TM CPU usage multiplier value reduced from 2 to 1 (more balanced)



4.1.2 The job 2

  • DAG
    • Source1: p=10
  • Sink:
    • Source2 :   p=
  • 20
    • 10
    • Sink:p=30
  • Slot per tm: 23
  • Metrics:
    • LocalInput_Preferred
      • TM Task loading
        • Image RemovedImage Added
      • cpu avg & cpu max
        •  
        • Image RemovedImage Added
    • Balanced_Preferred
      • TM Task loading
        • Image RemovedImage Added
      • cpu avg & cpu max 
        • Image RemovedImage Added
  • Benefit table


Benefits fields(TM) \ Scheduling Strategy

LocalInputPreferred

BalancedPreferred

Required VCore (Based on max vcore usage)

Each TM needs to apply for 5 VCores

Each TM just apply for 2.63 VCores

Task number loading diff

Tm Task load  number quantity difference is 24

Tm Task number quantity difference is 0

CPU usage multiplier factor

(max/avg)

TM CPU usage multiplier value is 2, (5.09/2.56)

TM CPU usage multiplier value reduced from 2 to 1 (more balanced)

4.1.2 The job 2

  • DAG
    • Source1: p=10
    • Source2 :   p=10
    • Sink:p=30
  • Slot per tm: 3
  • Metrics
    • LocalInput_Preferred
      • TM Task loading
        • Image Removed
      • cpu avg & cpu max
        •  Image Removed
    • Balanced_Preferred
      • TM Task loading
        • Image Removed
      • cpu avg & cpu max 
        • Image Removed

2

TM CPU usage multiplier value reduced from 2 to 1 (more balanced)


4.2 The job with complex DAG

4.2.1 The Job 1

  • DAG
    • Image Added
  • Compared-Metrics
    • Image Added
  • Benefit table


Benefits fields(TM) \ Scheduling Strategy

LocalInputPreferred

BalancedPreferred

Required VCore (Based on max vcore usage)

Each TM needs to apply for 53 VCores

Each TM just apply for 32 VCores

Task loading number diff

Tm Task load  load number quantity difference is 412

Tm Task number quantity difference is 01

CPU usage multiplier factor

(max/avg)

TM CPU usage multiplier value is 24.154

TM CPU usage multiplier value reduced from 2 to 1 4.154 to 2.09 (more balanced)


4.2

...

.2

...

The Job

...

2

  • DAG
    • Image Removed
  • Compared-Metrics
    • Image RemovedImage Added
  • Benefit table


Benefits fields(TM) \ Scheduling Strategy

LocalInputPreferred

BalancedPreferred

Required VCore (Based on max vcore usage)

Each TM needs to apply for 3 VCores

Each TM just apply for 21.9 VCores

Task number diff

Tm Task load number quantity difference is 12greater than 10

Tm Task number quantity difference is 1

CPU usage multiplier factor

(max/avg)

TM CPU usage multiplier value is 43.1546

TM CPU usage multiplier value reduced from 43.154 6 to 2.09 07 (more balanced)

4.2.2 Job 2

  • Compared-Metrics
    • Image Removed
  • Benefit table

Benefits fields(TM) \ Scheduling Strategy

LocalInputPreferred

BalancedPreferred

Required VCore (Based on max vcore usage)

Each TM needs to apply for 3 VCores

Each TM just apply for 1.9 VCores

Task number diff

Tm Task load number quantity difference is greater than 10

Tm Task number quantity difference is 1

CPU usage multiplier factor

(max/avg)

TM CPU usage multiplier value is 3.6

TM CPU usage multiplier value reduced from 3.6 to 2.07 (more balanced)

The jobs with complex DAG cannot achieve absolute balance of CPU usage on TM level due to different loads between tasks. But from the results, there are also benefits.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

By default, this feature will not be enabled, and it will not have any impact on the user unless they actively enable this parameter. This means that the user has a clear understanding of the meaning of this parameter, such as what effect they want to achieve by enabling this feature.

  • If we are changing behavior how will we phase out the older behavior?

We will not eliminate the original strategy, and the new strategy introduced is at the same level as the original strategy

  • If we need special migration tools, describe them here.

N.A.

  • When will we remove the existing behavior?

N.A.

Test Plan

  • We will focus on performance testing in three deployment modes:
    • Focus on two dimensions:
      • Does the number of tasks within each Slot within the Job scope meet expectations?
      • Does the number of tasks in each TM within the TM range meet expectations?
    • 1 Standalone session
      • Single job
      • Multiple jobs
    • 2 Application mode
    • 3 Session mode
      • Single job
      • Multiple jobs

Rejected Alternatives


The jobs with complex DAG cannot achieve absolute balance of CPU usage on TM level due to different loads between tasks. But from the results, there are also benefits.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

By default, this feature will not be enabled, and it will not have any impact on the user unless they actively enable this parameter. This means that the user has a clear understanding of the meaning of this parameter, such as what effect they want to achieve by enabling this feature.

  • If we are changing behavior how will we phase out the older behavior?

We will not eliminate the original strategy, and the new strategy introduced is at the same level as the original strategy

  • If we need special migration tools, describe them here.

N.A.

  • When will we remove the existing behavior?

N.A.

Test Plan

  • We will focus on performance testing in three deployment modes:
    • Focus on two dimensions:
      • Does the number of tasks within each Slot within the Job scope meet expectations?
      • Does the number of tasks in each TM within the TM range meet expectations?
    • 1 Standalone session
      • Single job
      • Multiple jobs
    • 2 Application mode
    • 3 Session mode
      • Single job
      • Multiple jobs

Rejected Alternatives

  • Expose two options to the user
  • Change the strategy of slot.request.max-interval
    • The old strategy is: when `current time > arrival time of the last slotRequest + slot.request.max-interval` and `the number of available slots cached exceeds or equals the pending slots`, the slots selection and allocation can be carried out.
    • The latest strategy is:
      • For DeclarativeSlotPool#increaseResourceRequirementsBy, When `current time > arrival time of the last slotRequest + slot.request.max-interval`, then the DeclarativeSlotPool declareResourceRequirement to the RM.
      • For DeclarativeSlotPool#decreaseResourceRequirementsByWhen `current time > arrival time of the last slotRequest + slot.request.max-interval`, then the DeclarativeSlotPool declareResourceRequirement to the RM.
      • For DeclarativeSlotPoolBridge, When `the number of available slots cached exceeds or equals the pending slots`, the slots selection and allocation can be carried out.
    • This new strategy not only implements the functionality of the old strategy, but also reduces the number of rpcs from SlotPool to ResourceManager
    If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way
    • .


References

[1] https://docs.google.com/document/d/14WhrSNGBdcsRl3IK7CZO-RaZ5KXU2X1dWqxPEFr3iS8