Table of Contents |
---|
Page properties | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
...
The current strategy of Flink to deploy tasks sometimes leads some TMs have more tasks while others have fewer tasks, resulting in excessive resource utilization at some TMs that contain more tasks and becoming a bottleneck for the entire job processing. Developing strategies to achieve task load balancing for TMs and reducing job bottlenecks becomes very meaningful.
The raw design doc with discussion is recorded in [1].
1.1 Current strategy
1.1.1 Rule of slot sharing strategy when generating the mapping task to slot.
...
JV4: parallelism=1 ;
Slot(ESSG) 0-4.
JV | SEV index | JV Parallel | Parallelism == numberOfSlots? | eSSGIndex(default,-1) | Operation |
JV0 | SEV0 | 5 | Y | N.A | Loop: Add SEV(i) into ESSG(i)/Slot(i), i=0,1,2,3,4. |
SEV1 | |||||
SEV2 | |||||
SEV3 | |||||
SEV4 | |||||
JV1 | SEV0 | 3 | N | ++eSSGIndex%numberOfSlots=0 | Add SEV0 into ESSG(0)/Slot(0) |
SEV1 | ++eSSGIndex%numberOfSlots=1 | Add SEV1 into ESSG(1)/Slot(1) | |||
SEV2 | ++eSSGIndex%numberOfSlots=2 | Add SEV2 into ESSG(2)/Slot(2) | |||
JV2 | SEV0 | 5 | Y | N.A | Loop: Add SEV(i) into ESSG(i)/Slot(i), i=0,1,2,3,4. |
SEV1 | |||||
SEV2 | |||||
SEV3 | |||||
SEV4 | |||||
JV3 | SEV0 | 2 | N | ++eSSGIndex%numberOfSlots=3 | Add SEV0 into ESSG(3)/Slot(3) |
SEV1 | ++eSSGIndex%numberOfSlots=4 | Add SEV1 into ESSG(4)/Slot(4) | |||
JV4 | SEV0 | 1 | N | ++eSSGIndex%numberOfSlots=0 | Add SEV0 into ESSG(0)/Slot(0) |
The final number of the tasks for Slots of the Example:
...
When the taskmanager.numberOfTaskSlots is 3, the assigned result:
TM1 | TM1 task number(10) | TM2 | TM2 task number(10) |
ESSG0 | 4 | ESSG1 | 4 |
ESSG2 | 3 | ESSG3 | 3 |
ESSG4 | 3 | ESSG5 | 3 |
When the taskmanager.numberOfTaskSlots is 2, the assigned result:
TM1 | TM1 task number(7) | TM2 | TM2 task number(7) | TM3 | TM3 task number(6) |
ESSG0 | 4 | ESSG1 | 4 | ESSG2 | 3 |
ESSG3 | 3 | ESSG4 | 3 | ESSG5 | 3 |
Example2:
- Assume there are 7 ESSGs.
...
When the taskmanager.numberOfTaskSlots is 3, the assigned result:
TM1 | TM1 task number(10) | TM2 | TM2 task number(10) | TM3 | TM3 task number(4) |
ESSG0 | 4 | ESSG1 | 4 | ESSG2 | 4 |
ESSG3 | 3 | ESSG4 | 3 | Free slot | |
ESSG5 | 3 | ESSG5 | 3 | Free slot |
When the taskmanager.numberOfTaskSlots is 2, the assigned result:
TM1 | TM1 task number(7) | TM2 | TM2 task number(7) | TM3 | TM3 task number(7) | TM4 | TM4 task number(3) |
ESSG0 | 4 | ESSG1 | 4 | ESSG2 | 4 | ESSG3 | 3 |
ESSG4 | 3 | ESSG5 | 3 | ESSG6 | 3 | Free slot |
Note:
- All free slots are in the last TM, because ResourceManager doesn’t have the waiting mechanism, and it just requests 7 slots for this JobMaster.
- Why is it acceptable?
...
- cpu avg & cpu max
- cpu avg & cpu max
- Benefit table
Benefits fields(TM) \ Scheduling Strategy | LocalInputPreferred | BalancedPreferred |
Required VCore (Based on max vcore usage) | Each TM needs to apply for 5 VCores | Each TM just apply for 2.6 VCores |
Task number diff | Tm Task number quantity difference is 2 | Tm Task number quantity difference is 0 |
CPU usage multiplier factor (max/avg) | TM CPU usage multiplier value is 2, (5.09/2.56) | TM CPU usage multiplier value reduced from 2 to 1 (more balanced) |
4.1.2 The job 2
- DAG
- Source1: p=10
- Source2 : p=10
- Sink:p=30
- Slot per tm: 3
- Metrics
- LocalInput_Preferred
- TM Task loading
- TM Task loading
- LocalInput_Preferred
...
- cpu avg & cpu max
- cpu avg & cpu max
- Benefit table
Benefits fields(TM) \ Scheduling Strategy | LocalInputPreferred | BalancedPreferred |
Required VCore (Based on max vcore usage) | Each TM needs to apply for 5 VCores | Each TM just apply for 3 VCores |
Task loading diff | Tm Task load number quantity difference is 4 | Tm Task number quantity difference is 0 |
CPU usage multiplier factor (max/avg) | TM CPU usage multiplier value is 2 | TM CPU usage multiplier value reduced from 2 to 1 (more balanced) |
4.2 The job with complex DAG
...
- Compared-Metrics
- Benefit table
Benefits fields(TM) \ Scheduling Strategy | LocalInputPreferred | BalancedPreferred |
Required VCore (Based on max vcore usage) | Each TM needs to apply for 3 VCores | Each TM just apply for 2 VCores |
Task number diff | Tm Task load number quantity difference is 12 | Tm Task number quantity difference is 1 |
CPU usage multiplier factor (max/avg) | TM CPU usage multiplier value is 4.154 | TM CPU usage multiplier value reduced from 4.154 to 2.09 (more balanced) |
4.2.2 Job 2
- Compared-Metrics
- Benefit table
Benefits fields(TM) \ Scheduling Strategy | LocalInputPreferred | BalancedPreferred |
Required VCore (Based on max vcore usage) | Each TM needs to apply for 3 VCores | Each TM just apply for 1.9 VCores |
Task number diff | Tm Task load number quantity difference is greater than 10 | Tm Task number quantity difference is 1 |
CPU usage multiplier factor (max/avg) | TM CPU usage multiplier value is 3.6 | TM CPU usage multiplier value reduced from 3.6 to 2.07 (more balanced) |
The jobs with complex DAG cannot achieve absolute balance of CPU usage on TM level due to different loads between tasks. But from the results, there are also benefits.
...