Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current stateUnder Discussion

...

Page properties


Discussion thread

...

Vote threadhttp://

...


Please keep the discussion on

...

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
titleStreamGraphGenerator
public class StreamGraphGenerator {
    public StreamGraphGenerator setSlotSharingGroupResource(Map<String, ResourceProfile> slotSharingGroupResources);
}

The specified SSG resource requirements need to be passed on all the way to the corresponding SlotSharingGroup in ExecutionGraph.

Slot Allocation

Currently, slot requests for SSGs are generated by SlotSharingExecutionSlotAllocator. We propose to make SlotSharingExecutionSlotAllocator use the resource requirements in corresponding SlotSharingGroups for generating the slot requests.

Related Issues

Network Memory

Network memory is included in the current ResourceProfile implementation, expecting the fine-grained resource management to not deploy too many tasks onto a TM that require more network memory than the TM contains.

However, how much network memory each task needs highly depends on the shuffle service implementation, and may vary when switching to another shuffle service. Therefore, neither user nor the Flink runtime can easily specify network memory requirements for a task/slot at the moment.

The concrete solution for network memory controlling is beyond the scope of this FLIP. However, we are aware of a few potential directions for solving this problem.

  • Make shuffle services adaptively control the amount of memory assigned to each task/slot, with respect to the given memory pool size. In this way, there should be no need to rely on fine-grained resource management to control the network memory consumption.
  • Make shuffle services expose interfaces for calculating network memory requirements for given SSGs. In this way, the Flink runtime can specify the calculated network memory requirements for slots, without having to understand the internal details of different shuffle service implementations.

As for now, we propose in FLINK-20863 to exclude network memory from ResourceProfile for the moment, to unblock the fine-grained resource management feature from the network memory controlling issue. If needed, it can be added back in future, as long as there’s a good way to specify the requirement.

Resource Matching

Currently, ResourceProfile::isMatching uses the following rules (hereinafter, loose matching) to decide whether a slot resource can be used to fulfill the given resource requirement, in both SlotManager and SlotPool

  • An unspecified requirement (ResourceProfile::UNKNOWN) can be fulfilled by any resource.
  • A specified requirement can be fulfilled by any resource that is greater than or equal to itself. Note that this rule is not taking effect since there’s no specified requirement atm.

The loose matching rules were designed before the dynamic slot allocation. Under the assumption that resources of slots are decided when the TM is started and cannot be changed, the loose matching rules have the following advantages.

  • For standalone deployments, it allows slot requests to be fulfilled when the slots of pre-launched TMs can hardly have the exact required resources.
  • For active resource manager deployments, it increases the chance of slots being reused, thus reducing the cost of starting new TMs for various resource requirements.

With dynamic slot allocation introduced in FLIP-56, the benefits of the loose matching rules have been significantly reduced. As slots can be dynamically created after the TMs being started, with any desired resources as long as available, the only benefit the loose matching rules retain is to avoid allocating new slots when the slots can be reused on the JM side, which is insignificant since there’s no need to start new TMs.

On the other hand, the loose matching rules also introduce some problems.

  • Reusing larger slots for fulfilling smaller requirements can harm resource utilization.
  • It’s not straightforward to always find a feasible matching solution (assuming there is one) when matching a set of requirements and slots, in cases of job failovers or declarative slot allocation protocol.

Image Removed

The above figure demonstrates how it could fail to find the feasible matching solution with the loose matching rules. Assuming there are two resource requirements A and B, and there are two slots X and Y. The number below each Requirement/Slot represents the amount of resource. Then A can be fulfilled with X and Y, while B can only be fulfilled with Y. A feasible matching is shown on the left, where both requirements can be fulfilled. However, the loose matching rules can also result in another matching, shown on the right, where A is fulfilled by Y, leaving B and X unmatched.

Given the reduction of its benefits and the problems it introduced, we proposed in FLINK-20864 to replace the loose matching rules with the following exact matching rules.

  • An unspecified requirement (ResourceProfile::UNKNOWN) can only be fulfilled by a TM's default slot resource.
  • A specified requirement can only be fulfilled by a resource that is equal to itself.

Resource Deadlock

Image Removed

The above figure demonstrates a potential case of deadlock due to scheduling dependency. For the given topology, initially the scheduler will request 4 slots, for A, B, C and D. Assuming only 2 slots are available, if both slots are assigned to Pipeline Region 0 (as shown on the left), A and B will first finish execution, then C and D will be executed, and finally E will be executed. However, if in the beginning the 2 slots are assigned to A and C (as shown on the right), then neither of A and C can finish execution due to missing B and D consuming the data they produced.

Currently, with coarse-grained resource management, the scheduler guarantees to always finish fulfilling requirements of one pipeline region before starting to fulfill requirements of another. That means the deadlock case shown on the right of the above figure can never happen.

However, there’s no such guarantee in fine-grained resource management. Since resource requirements for SSGs can be different, there’s no control on which requirements will be fulfilled first, when there are not enough resources to fulfill all the requirements. Therefore, it’s not always possible to fulfill one pipeline region prior to another.

To solve this problem, FLINK-20865 proposes to make the scheduler defer requesting slots for other SSGs before requirements of the current SSG are fulfilled, for fine-grained resource management, at the price of more scheduling time.

Reactive Scheduling

We are aware that fine-grained resource management may not easily work with reactive scheduling, a future feature that is still in planning, that decides the parallelism of execution based on the available resources (as mentioned in FLIP-138).

For fine-grained resource management to work with reactive scheduling, an important open question is which resource requirements should be fulfilled first when there’s not enough resource to fulfill all of them.

Image Removed

The above figure shows a target execution plan on the left that requires 4 slots for each of A and B. On the right, there are 3 possible cases that not all the resource requirements can be fulfilled.

  • In Case 1, we get roughly half of the target processing capacity.
  • In Case 2, we may only get about 1/4 of the target processing capacity, bottlenecked by B.
  • In Case 3, the job cannot be executed at all.

As we can see, how resource requirements are fulfilled with insufficient resources can significantly affect Flink’s performance, and even availability. It could become more complicated when it comes to more complex target execution plans, with heterogeneous target parallelism and scheduling dependencies.

As the first step, we do not support reactive scheduling for fine-grained resource management.

In the future, the problem can potentially be resolved along with the following directions.

...

/**
     * Specify fine-grained resource requirements for slot sharing groups.
     *
     * <p>Note that a slot sharing group hints the scheduler that the grouped operators CAN be
     * deployed into a shared slot. There's no guarantee that the scheduler always deploy the
     * grouped operator together. In cases grouped operators are deployed into separate slots,
     * the slot resources will be derived from the specified group requirement.
     */
    public StreamGraphGenerator setSlotSharingGroupResource(Map<String, ResourceProfile> slotSharingGroupResources);
}

The specified SSG resource requirements need to be passed on all the way to the corresponding SlotSharingGroup in ExecutionGraph.

Slot Allocation

Currently, slot requests for SSGs are generated by SlotSharingExecutionSlotAllocator. We propose to make SlotSharingExecutionSlotAllocator use the resource requirements in corresponding SlotSharingGroups for generating the slot requests.

Related Issues

Network Memory

Network memory is included in the current ResourceProfile implementation, expecting the fine-grained resource management to not deploy too many tasks onto a TM that require more network memory than the TM contains.

However, how much network memory each task needs highly depends on the shuffle service implementation, and may vary when switching to another shuffle service. Therefore, neither user nor the Flink runtime can easily specify network memory requirements for a task/slot at the moment.

The concrete solution for network memory controlling is beyond the scope of this FLIP. However, we are aware of a few potential directions for solving this problem.

  • Make shuffle services adaptively control the amount of memory assigned to each task/slot, with respect to the given memory pool size. In this way, there should be no need to rely on fine-grained resource management to control the network memory consumption.
  • Make shuffle services expose interfaces for calculating network memory requirements for given SSGs. In this way, the Flink runtime can specify the calculated network memory requirements for slots, without having to understand the internal details of different shuffle service implementations.

As for now, we propose in FLINK-20863 to exclude network memory from ResourceProfile for the moment, to unblock the fine-grained resource management feature from the network memory controlling issue. If needed, it can be added back in future, as long as there’s a good way to specify the requirement.

Resource Matching

Currently, ResourceProfile::isMatching uses the following rules (hereinafter, loose matching) to decide whether a slot resource can be used to fulfill the given resource requirement, in both SlotManager and SlotPool

  • An unspecified requirement (ResourceProfile::UNKNOWN) can be fulfilled by any resource.
  • A specified requirement can be fulfilled by any resource that is greater than or equal to itself. Note that this rule is not taking effect since there’s no specified requirement atm.

The loose matching rules were designed before the dynamic slot allocation. Under the assumption that resources of slots are decided when the TM is started and cannot be changed, the loose matching rules have the following advantages.

  • For standalone deployments, it allows slot requests to be fulfilled when the slots of pre-launched TMs can hardly have the exact required resources.
  • For active resource manager deployments, it increases the chance of slots being reused, thus reducing the cost of starting new TMs for various resource requirements.

With dynamic slot allocation introduced in FLIP-56, the benefits of the loose matching rules have been significantly reduced. As slots can be dynamically created after the TMs being started, with any desired resources as long as available, the only benefit the loose matching rules retain is to avoid allocating new slots when the slots can be reused on the JM side, which is insignificant since there’s no need to start new TMs.

On the other hand, the loose matching rules also introduce some problems.

  • Reusing larger slots for fulfilling smaller requirements can harm resource utilization.
  • It’s not straightforward to always find a feasible matching solution (assuming there is one) when matching a set of requirements and slots, in cases of job failovers or declarative slot allocation protocol.

Image Added

The above figure demonstrates how it could fail to find the feasible matching solution with the loose matching rules. Assuming there are two resource requirements A and B, and there are two slots X and Y. The number below each Requirement/Slot represents the amount of resource. Then A can be fulfilled with X and Y, while B can only be fulfilled with Y. A feasible matching is shown on the left, where both requirements can be fulfilled. However, the loose matching rules can also result in another matching, shown on the right, where A is fulfilled by Y, leaving B and X unmatched.

Given the reduction of its benefits and the problems it introduced, we proposed in FLINK-20864 to replace the loose matching rules with the following exact matching rules.

  • An unspecified requirement (ResourceProfile::UNKNOWN) can only be fulfilled by a TM's default slot resource.
  • A specified requirement can only be fulfilled by a resource that is equal to itself.

Resource Deadlock

Image Added

The above figure demonstrates a potential case of deadlock due to scheduling dependency. For the given topology, initially the scheduler will request 4 slots, for A, B, C and D. Assuming only 2 slots are available, if both slots are assigned to Pipeline Region 0 (as shown on the left), A and B will first finish execution, then C and D will be executed, and finally E will be executed. However, if in the beginning the 2 slots are assigned to A and C (as shown on the right), then neither of A and C can finish execution due to missing B and D consuming the data they produced.

Currently, with coarse-grained resource management, the scheduler guarantees to always finish fulfilling requirements of one pipeline region before starting to fulfill requirements of another. That means the deadlock case shown on the right of the above figure can never happen.

However, there’s no such guarantee in fine-grained resource management. Since resource requirements for SSGs can be different, there’s no control on which requirements will be fulfilled first, when there are not enough resources to fulfill all the requirements. Therefore, it’s not always possible to fulfill one pipeline region prior to another.

To solve this problem, FLINK-20865 proposes to make the scheduler defer requesting slots for other SSGs before requirements of the current SSG are fulfilled, for fine-grained resource management, at the price of more scheduling time.

Reactive Scheduling

We are aware that fine-grained resource management may not easily work with reactive scheduling, a future feature that is still in planning, that decides the parallelism of execution based on the available resources (as mentioned in FLIP-138).

For fine-grained resource management to work with reactive scheduling, an important open question is which resource requirements should be fulfilled first when there’s not enough resource to fulfill all of them.

Image Added

The above figure shows a target execution plan on the left that requires 4 slots for each of A and B. On the right, there are 3 possible cases that not all the resource requirements can be fulfilled.

  • In Case 1, we get roughly half of the target processing capacity.
  • In Case 2, we may only get about 1/4 of the target processing capacity, bottlenecked by B.
  • In Case 3, the job cannot be executed at all.

As we can see, how resource requirements are fulfilled with insufficient resources can significantly affect Flink’s performance, and even availability. It could become more complicated when it comes to more complex target execution plans, with heterogeneous target parallelism and scheduling dependencies.

As the first step, we do not support reactive scheduling for fine-grained resource management.

In the future, the problem can potentially be resolved along with the following directions.

  • The scheduler may declare a pair of minimum/target required number of slots for each slot resource. In this way, we should always try to allocate the minimum set of resources for executing the job. This should help avoid the worst case (Case 3 in the above example) when it’s possible.
  • We may also rely on the scheduler to detect the non-optimal cases (Case 2 & 3 in the above example), and to adjust the declared resource requirements and return unnecessary resources.

Future Plan

Potential follow-up issues

In the discussion, we found that there are some potential issues for fine-grained resource management. The issues recorded below will be done in the future when we collect enough feedbacks from user to prove it is valuable.

Operator-level resource configuration interface

We might provide operator-level resource configuration interfaces for expert users who know the exact resource usage of each operator and also want to control the scheduling strategy in a finer grain.

Default operator resource configuration

In this FLIP, the SSGs with unknown resource requirements will be scheduled with the resource profile of the default slot. We might allow user to configure the default operator resource and schedule those SSGs according to the number of operators inside it.

Known limitations and constraints to be documented

When the fine-grained resource management feature is provided to user, the following limitations and constraints should be well documented, with potential impacts and suggestions.

  • Setting chain-able operators to different SSGs may break operator chaining, thus change the performance.
  • Change of data exchange mode (pipelined vs. blocking) with in a SSG may affect the resource requirement of the group.
  • Parallelism differences between operators in the same SSG may reduce the resource utilization.