Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Page properties


Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-31757

Release<Flink Version>


...

The current strategy of Flink to deploy tasks sometimes leads some TMs have more tasks while others have fewer tasks, resulting in excessive resource utilization at some TMs that contain more tasks and becoming a bottleneck for the entire job processing. Developing strategies to achieve task load balancing for TMs and reducing job bottlenecks becomes very meaningful.

The raw design doc with discussion is recorded in [1].

1.1 Current strategy

1.1.1 Rule of slot sharing strategy when generating the mapping task to slot.

...

JV4: parallelism=1  ; 

Slot(ESSG) 0-4.

JV 

SEV index

JV

Parallel

Parallelism == numberOfSlots?

eSSGIndex(default,-1)

Operation

JV0

SEV0

5

Y

N.A

Loop: Add SEV(i) into ESSG(i)/Slot(i), 

i=0,1,2,3,4.

SEV1

SEV2

SEV3

SEV4

JV1

SEV0

3

N

++eSSGIndex%numberOfSlots=0

Add SEV0 into ESSG(0)/Slot(0)

SEV1

++eSSGIndex%numberOfSlots=1

Add SEV1 into ESSG(1)/Slot(1)

SEV2

++eSSGIndex%numberOfSlots=2

Add SEV2 into ESSG(2)/Slot(2)

JV2

SEV0

5

Y

N.A

Loop: Add SEV(i) into ESSG(i)/Slot(i), 

i=0,1,2,3,4.

SEV1

SEV2

SEV3

SEV4

JV3

SEV0

2

N

++eSSGIndex%numberOfSlots=3

Add SEV0 into ESSG(3)/Slot(3)

SEV1

++eSSGIndex%numberOfSlots=4

Add SEV1 into ESSG(4)/Slot(4)

JV4

SEV0

1

N

++eSSGIndex%numberOfSlots=0

Add SEV0 into ESSG(0)/Slot(0)


The final number of the tasks for Slots of the Example:

...

When the taskmanager.numberOfTaskSlots is 3, the assigned result:

TM1

TM1 task number(10)

TM2

TM2 task number(10)

ESSG0

4

ESSG1

4

ESSG2

3

ESSG3

3

ESSG4

3

ESSG5

3



When the taskmanager.numberOfTaskSlots is 2, the assigned result:

TM1

TM1 task number(7)

TM2

TM2 task number(7)

TM3

TM3 task number(6)

ESSG0

4

ESSG1

4

ESSG2

3

ESSG3

3

ESSG4

3

ESSG5

3



Example2:

  • Assume there are 7 ESSGs.

...

When the taskmanager.numberOfTaskSlots is 3, the assigned result:

TM1

TM1 task number(10)

TM2

TM2 task number(10)

TM3

TM3 task number(4)

ESSG0

4

ESSG1

4

ESSG2

4

ESSG3

3

ESSG4

3

Free slot


ESSG5

3

ESSG5

3

Free slot




When the taskmanager.numberOfTaskSlots is 2, the assigned result:

TM1

TM1 task number(7)

TM2

TM2 task number(7)

TM3

TM3 task number(7)

TM4

TM4 task number(3)

ESSG0

4

ESSG1

4

ESSG2

4

ESSG3

3

ESSG4

3

ESSG5

3

ESSG6

3

Free slot



Note: 

  • All free slots are in the last TM, because ResourceManager doesn’t have the waiting mechanism, and it just requests 7 slots for this JobMaster.
  • Why is it acceptable?

...

      • cpu avg & cpu max 
  • Benefit table


Benefits fields(TM) \ Scheduling Strategy

LocalInputPreferred

BalancedPreferred

Required VCore (Based on max vcore usage)

Each TM needs to apply for 5 VCores

Each TM just apply for 2.6 VCores

Task number diff

Tm Task number quantity difference is 2

Tm Task number quantity difference is 0

CPU usage multiplier factor

(max/avg)

TM CPU usage multiplier value is 2, (5.09/2.56)

TM CPU usage multiplier value reduced from 2 to 1 (more balanced)



4.1.2 The job 2

  • DAG
    • Source1: p=10
    • Source2 :   p=10
    • Sink:p=30
  • Slot per tm: 3
  • Metrics
    • LocalInput_Preferred
      • TM Task loading

...

      • cpu avg & cpu max 
  • Benefit table


Benefits fields(TM) \ Scheduling Strategy

LocalInputPreferred

BalancedPreferred

Required VCore (Based on max vcore usage)

Each TM needs to apply for 5 VCores

Each TM just apply for 3 VCores

Task loading diff

Tm Task load  number quantity difference is 4

Tm Task number quantity difference is 0

CPU usage multiplier factor

(max/avg)

TM CPU usage multiplier value is 2

TM CPU usage multiplier value reduced from 2 to 1 (more balanced)


4.2 The job with complex DAG

...

  • Compared-Metrics
  • Benefit table


Benefits fields(TM) \ Scheduling Strategy

LocalInputPreferred

BalancedPreferred

Required VCore (Based on max vcore usage)

Each TM needs to apply for 3 VCores

Each TM just apply for 2 VCores

Task number diff

Tm Task load number quantity difference is 12

Tm Task number quantity difference is 1

CPU usage multiplier factor

(max/avg)

TM CPU usage multiplier value is 4.154

TM CPU usage multiplier value reduced from 4.154 to 2.09 (more balanced)


4.2.2 Job 2

  • Compared-Metrics
  • Benefit table


Benefits fields(TM) \ Scheduling Strategy

LocalInputPreferred

BalancedPreferred

Required VCore (Based on max vcore usage)

Each TM needs to apply for 3 VCores

Each TM just apply for 1.9 VCores

Task number diff

Tm Task load number quantity difference is greater than 10

Tm Task number quantity difference is 1

CPU usage multiplier factor

(max/avg)

TM CPU usage multiplier value is 3.6

TM CPU usage multiplier value reduced from 3.6 to 2.07 (more balanced)


The jobs with complex DAG cannot achieve absolute balance of CPU usage on TM level due to different loads between tasks. But from the results, there are also benefits.

...