Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties


Discussion threadhttps://lists.apache.org/thread/mx3ot0fmk6zr02ccdby0s8oqxqm2pn1x
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)thread/l4g4jnggp677hb5mxxk5rb8jc1tth2sc
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-31757

Release<Flink Version>


...

  • If the JV of the SEV doesn’t contain co-location constraint
    • If the parallelism of JV is equals to numberOfSlots
      • Traverse all SEVs of JV, assign the SEVs[subtask_index] to the ESSGs[subtask_index]This strategy ensures that SEVs with the same index can be assigned to the same ESSG. 
      • It also ensures that co-located subtasks  will be in one ESSG, given that co-located job vertices share the same parallelism and will always be in the same SSG. 
      • In the case of forward edges, remote data exchange of such JVs can be reducedall subtasks with forward shuffle are still in the same Slot, and they are local data exchange
    • If the parallelism of JV is less than the numberOfSlots in the current SSG.
      • Get  ESSGs[++eSSGIndex]  as ESSG.
      • Add the SEV into the target ESSG.

...

  • How to respect condition2:
    • Start assigning slots to tm after the number of available slots exceeds or equals the number of slot requests.
  • How to respect condition1:
    • The root cause of condition1 cannot be met is that the slot requests are sent from JobMaster to SlotPool is one by one instead of one whole batch.
    • The condition1 is meet in the most of scenarios, because the request for slots is faster than registering slots to the slot pool.(Registering slots to the slot pool needs to request and start TM.)
    • But there are two special cases: 
      • This is not true if a job is re-scheduled after a task failure. (Slots are ready in advance)
      • It's also not true for batch jobs. (Single task can run)
    • Solution1: Introducing an additional waiting mechanism: `slot.request.max-interval`
      • It means the maximum time interval when JM requests multiple slots from slotPool.
      • We think all slot requests are sent from JobMaster to SlotPool when `current time > arrival time of the last slotRequest + slot.request.max-interval`.
      • Of course, the `slot.request.max-interval` can be very small, like 30ms20ms? not sure. Because the call stack is java method call, it isn't across process and it isn't rpc.
    • Solution2: Changing or adding the interface that from Scheduler to SlotPool.
      • 2.1 Changing the interface from the single slot request to batch slot request. It doesn't work well with PipelinedRegionSchedulingStrategy.
      • 2.2 Adding a interface let slot pool know all requests are arrived. (If so, the slot pool doesn't need any timeout or waiting mechanism).
    • We choose the solution1 because the solution2 may complicate the scheduling process a lot. (From this discussion).
  • Based on these information, when :
    • When `current time > arrival time of the last slotRequest + slot.request.max-interval`, then the SlotPool declareResourceRequirement to the RM. (For all jobs, includes batch and streaming)
    • When `the number of available slots cached exceeds or equals the pending slots`, the slots selection and allocation can be carried out.

2.2.3 Allocation strategy for slot to TM

When the available slots applied for can support job deploying, the waiting could be ended. After that, sort the ExecutionSlotSharingGroups(slotRequests) descending based on the slot load. And round-robin allocation for the slot whose taskmanager contains the most free slot number and the smallest tasks loading.

Example1:

  • Assume there are 6 ESSGs.

ESSG0: the number of the tasks is  4

ESSG1: the number of the tasks is  4

ESSG2: the number of the tasks is  3

ESSG3: the number of the tasks is  3

ESSG4: the number of the tasks is  3

ESSG5: the number of the tasks is  3

When the taskmanager.numberOfTaskSlots is 3, the assigned result:

TM1

TM1 task number(10)

TM2

TM2 task number(10)

ESSG0

4

ESSG1

4

ESSG2

3

ESSG3

3

ESSG4

3

ESSG5

3

    • (Just for streaming job)

Limitation: The waiting mechanism(`the number of available slots cached exceeds or equals the pending slots`) does not take effect for Batch jobs, so Batch jobs do not have a global perspective to balance the number of tasks when allocating slots to TMs. There are several main reasons:

  1. Batch jobs can run normally even with only one TM
  2. Batch jobs are run in stages, and there may be only a small number of tasks in a slot at the same time.
  3. Batch jobs can use the cluster.evenly-spread-out-slots=true capability to spread slots across as many TMs as possible, especially in OLAP scenarios.
    • The option is replaced with taskmanager.load-balance.mode=Slots, For details, see Section 3.1.

2.2.3 Allocation strategy for slot to TM

When the available slots applied for can support job deploying, the waiting could be ended. After that, sort the ExecutionSlotSharingGroups(slotRequests) descending based on the slot load. And round-robin allocation for the slot whose taskmanager contains the most free slot number and the smallest tasks loading.


Example1:

  • Assume there are 6 ESSGs.

ESSG0: the number of the tasks is  4

ESSG1: the number of the tasks is  4

ESSG2: the number of the tasks is  3

ESSG3: the number of the tasks is  3

ESSG4: the number of the tasks is  3

ESSG5: the number of the tasks is  3

When the taskmanager.numberOfTaskSlots is 3, the assigned result:

TM1

TM1 task number(10)

TM2

TM2 task number(10)

ESSG0

4

ESSG1

4

ESSG2

3

ESSG3

3

ESSG4

3

ESSG5

3



When the taskmanager.numberOfTaskSlots is 2, the assigned result:

...

  • Preferred conclusion:

We prefer to reuse and enhance `cluster.evenly-spread-out-slots`. The the option2, the rest of the document is designed by reusing the option in the future.

...

Introduce the following configuration items:

    • slottaskmanager.sharing-strategyload-balance.mode
      • This option controls 2 phases during scheduling
        • Task → Slot  has 2 strategies: LOCAL_INPUT_PREFERRED and
      • Value:
        • TASK_BALANCED_PREFERRED: It represents the new tasks’ balanced scheduling.
          • Introduce an Enum named TaskSlotSharingStrategy.
        • Slot → TM has 2 strategies: cluster.evenly-spread-out-slots is true or false
      • Value:
        • None: LOCAL_INPUT_PREFERRED :It represents the tasks’ local input preferred scheduling
      • type: TaskSlotSharingStrategy
        • + cluster.evenly-spread-out-slots=false (It's the default behavior of old flink version)
        • Slots:
        default:
        • LOCAL_INPUT_PREFERRED
        • +  cluster.evenly-spread-out-slots=true (It is used to be compatible with the old cluster.evenly-spread-out-slots option)
        • Tasks: TASK_BALANCED_PREFERRED + cluster.evenly-spread-out-slots=true (It's the new tasks’ balanced scheduling)
      • type: TaskManagerLoadBalanceMode
      • default: None
    • slot.request.max-interval
      • type:
      slot.request.max-interval
      • type: Duration
      • default: 50ms20ms
      • description: The max maximum interval duration for between JobMaster requesting a slot slots from the SlotPool.

3.2 Non-Public Change

    • Deprecate cluster.evenly-spread-out-slots
      • It can be replaced with the new taskmanager.load-balance.mode option.

3.2 Non-Public Change

3.3.2.1 Add the loading abstraction to various levels of slots 

  • Introduce the abstraction and the interface
    • LoadingWeight

      Code Block
      languagejava
      titleLoadingWeight
      linenumberstrue
      public interface LoadingWeight {
          float getLoading();
          LoadingWeight merge(LoadingWeight other);
      }


    • WeightLoadable

      Code Block
      languagejava
      titleLoadingWeightWeightLoadable
      linenumberstrue
      public interface WeightLoadable {
          LoadingWeight getLoading();
          void setLoading(@Nonnull LoadingWeight loadingWeight);
      }


...

      • Introduce the logic to process the parameter `slot.request.max-interval`, which means the maximum time interval when JM requests multiple slots from SlotPool.
      • When `current time > arrival time of the last slotRequest + slot.request.max-interval`, then the SlotPool declareResourceRequirement to the RM.
      • When ` and the number of available slots cached exceeds or equals the pending slotsslots`, the slots selection and allocation can be carried out.
  • Enhance the old logic related to cluster.evenly-spread-out-slots
    • Change findMatchingSlot method signature for interfaceSlotMatchingStrategy as followed:

      • Code Block
        languagejava
        titleSlotMatchingStrategy
        linenumberstrue
        <T extends TaskManagerSlotInformation> Optional<T> findMatchingSlot(
                    ResourceProfile requestedProfile,
                    Collection<T> freeSlots,
                    Function<InstanceID, Integer> numberRegisteredSlotsLookup,
                    // the function to get tasks loading for TM.
                    Function<InstanceID, Long> slotTasksLoadingLookup);


...

  • We will focus on performance testing in three deployment modes:
    • Focus on two dimensions:
      • Does the number of tasks within each Slot within the Job scope meet expectations?
      • Does the number of tasks in each TM within the TM range meet expectations?
    • 1 Standalone session
      • Single job
      • Multiple jobs
    • 2 Application mode
    • 3 Session mode
      • Single job
      • Multiple jobs

Rejected Alternatives

  • Expose two options to the user
  • Change the strategy of slot.request.max-interval
    • The old strategy is: when `current time > arrival time of the last slotRequest + slot.request.max-interval` and `the number of available slots cached exceeds or equals the pending slots`, the slots selection and allocation can be carried out.
    • The latest strategy is:
      • For DeclarativeSlotPool#increaseResourceRequirementsBy, When `current time > arrival time of the last slotRequest + slot.request.max-interval`, then the DeclarativeSlotPool declareResourceRequirement to the RM.
      • For DeclarativeSlotPool#decreaseResourceRequirementsByWhen `current time > arrival time of the last slotRequest + slot.request.max-interval`, then the DeclarativeSlotPool declareResourceRequirement to the RM.
      • For DeclarativeSlotPoolBridge, When `the number of available slots cached exceeds or equals the pending slots`, the slots selection and allocation can be carried out.
    • This new strategy not only implements the functionality of the old strategy, but also reduces the number of rpcs from SlotPool to ResourceManager
    If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way
    • .


References

[1] https://docs.google.com/document/d/14WhrSNGBdcsRl3IK7CZO-RaZ5KXU2X1dWqxPEFr3iS8