Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Page properties

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

...


Discussion thread

...


Please keep the discussion on

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

There are some ancient codes for setting operator resources on Transformation and aggregating them to generate slot requests. However, these codes are never really used and there are no APIs exposed to users. Most importantly, we are not sure letting users specify the operator level resource requirements and aggregating them at runtime is the right approach, which will be discussed in subsequence sections.

...

This FLIP proposes Slot Sharing Group (SSG) based runtime interfaces for specifying fine-grained resource requirements. To be specific, we discuss how resource requirements are specified at the Transformation layer and leveraged afterward, which covers the common path of both Table/SQL API and DataStream API workloads.

...

  • The fine-grained resource management is not end-to-end ready. We believe it should be the last step to activate the feature by exposing the user APIs.
  • Different development APIs may expose the interfaces for specifying resource requirements differently. It requires more in-depth discussions with the component experts to decide how this feature should be integrated by the development APIs. The following examples are only some preliminary ideas for demonstrating how the user interfaces may look like in different development APIs.
    • For DataStream API, there are already interfaces for setting SSGs for operators. Based on this, we can introduce new interfaces to specify resource requirements for the SSGs directly.
    • For Table API & SQL, since neither the concept of operator nor SSG is exposed, the planner probably should generate the SSG resource requirements, exposing only a few configuration knobs to users.

Granularity of Fine-Grained Resource Requirements

...

Code Block
languagejava
titleStreamGraphGenerator
public class StreamGraphGenerator {
    public StreamGraphGenerator setSlotSharingGroupResource(Map<String, ResourceProfile> slotSharingGroupResources);
}

The specified SSG resource requirements need to be passed on all the way to the corresponding SlotSharingGroup in ExecutionGraph.

Slot Allocation

Currently, slot requests for SSGs are generated by SlotSharingExecutionSlotAllocator. We propose to make SlotSharingExecutionSlotAllocator use the resource requirements in corresponding SlotSharingGroups for generating the slot requests.

Related Issues

Network Memory

Network memory is included in the current ResourceProfile implementation, expecting the fine-grained resource management to not deploy too many tasks onto a TM that require more network memory than the TM contains.

However, how much network memory each task needs highly depends on the shuffle service implementation, and may vary when switching to another shuffle service. Therefore, neither user nor the Flink runtime can easily specify network memory requirements for a task/slot at the moment.

The concrete solution for network memory controlling is beyond the scope of this FLIP. However, we are aware of a few potential directions for solving this problem.

  • Make shuffle services adaptively control the amount of memory assigned to each task/slot, with respect to the given memory pool size. In this way, there should be no need to rely on fine-grained resource management to control the network memory consumption.
  • Make shuffle services expose interfaces for calculating network memory requirements for given SSGs. In this way, the Flink runtime can specify the calculated network memory requirements for slots, without having to understand the internal details of different shuffle service implementations.

As for now, we propose in FLINK-20863 to exclude network memory from ResourceProfile for the moment, to unblock the fine-grained resource management feature from the network memory controlling issue. If needed, it can be added back in future, as long as there’s a good way to specify the requirement.

Resource Matching

Currently, ResourceProfile::isMatching uses the following rules (hereinafter, loose matching) to decide whether a slot resource can be used to fulfill the given resource requirement, in both SlotManager and SlotPool

  • An unspecified requirement (ResourceProfile::UNKNOWN) can be fulfilled by any resource.
  • A specified requirement can be fulfilled by any resource that is greater than or equal to itself. Note that this rule is not taking effect since there’s no specified requirement atm.

The loose matching rules were designed before the dynamic slot allocation. Under the assumption that resources of slots are decided when the TM is started and cannot be changed, the loose matching rules have the following advantages.

  • For standalone deployments, it allows slot requests to be fulfilled when the slots of pre-launched TMs can hardly have the exact required resources.
  • For active resource manager deployments, it increases the chance of slots being reused, thus reducing the cost of starting new TMs for various resource requirements.

With dynamic slot allocation introduced in FLIP-56, the benefits of the loose matching rules have been significantly reduced. As slots can be dynamically created after the TMs being started, with any desired resources as long as available, the only benefit the loose matching rules retain is to avoid allocating new slots when the slots can be reused on the JM side, which is insignificant since there’s no need to start new TMs.

On the other hand, the loose matching rules also introduce some problems.

  • Reusing larger slots for fulfilling smaller requirements can harm resource utilization.
  • It’s not straightforward to always find a feasible matching solution (assuming there is one) when matching a set of requirements and slots, in cases of job failovers or declarative slot allocation protocol.

Image Removed

The above figure demonstrates how it could fail to find the feasible matching solution with the loose matching rules. Assuming there are two resource requirements A and B, and there are two slots X and Y. The number below each Requirement/Slot represents the amount of resource. Then A can be fulfilled with X and Y, while B can only be fulfilled with Y. A feasible matching is shown on the left, where both requirements can be fulfilled. However, the loose matching rules can also result in another matching, shown on the right, where A is fulfilled by Y, leaving B and X unmatched.

Given the reduction of its benefits and the problems it introduced, we proposed in FLINK-20864 to replace the loose matching rules with the following exact matching rules.

  • An unspecified requirement (ResourceProfile::UNKNOWN) can only be fulfilled by a TM's default slot resource.
  • A specified requirement can only be fulfilled by a resource that is equal to itself.

Resource Deadlock

Image Removed

The above figure demonstrates a potential case of deadlock due to scheduling dependency. For the given topology, initially the scheduler will request 4 slots, for A, B, C and D. Assuming only 2 slots are available, if both slots are assigned to Pipeline Region 0 (as shown on the left), A and B will first finish execution, then C and D will be executed, and finally E will be executed. However, if in the beginning the 2 slots are assigned to A and C (as shown on the right), then neither of A and C can finish execution due to missing B and D consuming the data they produced.

Currently, with coarse-grained resource management, the scheduler guarantees to always finish fulfilling requirements of one pipeline region before starting to fulfill requirements of another. That means the deadlock case shown on the right of the above figure can never happen.

However, there’s no such guarantee in fine-grained resource management. Since resource requirements for SSGs can be different, there’s no control on which requirements will be fulfilled first, when there’s not enough resources to fulfill all the requirements. Therefore, it’s not always possible to fulfill one pipeline region prior to another.

To solve this problem, FLINK-20865 proposes to make the scheduler defer requesting slots for other SSGs before requirements of the current SSG are fulfilled, for fine-grained resource management, at the price of more scheduling time.

Rejected Alternatives

...

/**
     * Specify fine-grained resource requirements for slot sharing groups.
     *
     * <p>Note that a slot sharing group hints the scheduler that the grouped operators CAN be
     * deployed into a shared slot. There's no guarantee that the scheduler always deploy the
     * grouped operator together. In cases grouped operators are deployed into separate slots,
     * the slot resources will be derived from the specified group requirement.
     */
    public StreamGraphGenerator setSlotSharingGroupResource(Map<String, ResourceProfile> slotSharingGroupResources);
}

The specified SSG resource requirements need to be passed on all the way to the corresponding SlotSharingGroup in ExecutionGraph.

Slot Allocation

Currently, slot requests for SSGs are generated by SlotSharingExecutionSlotAllocator. We propose to make SlotSharingExecutionSlotAllocator use the resource requirements in corresponding SlotSharingGroups for generating the slot requests.

Related Issues

Network Memory

Network memory is included in the current ResourceProfile implementation, expecting the fine-grained resource management to not deploy too many tasks onto a TM that require more network memory than the TM contains.

However, how much network memory each task needs highly depends on the shuffle service implementation, and may vary when switching to another shuffle service. Therefore, neither user nor the Flink runtime can easily specify network memory requirements for a task/slot at the moment.

The concrete solution for network memory controlling is beyond the scope of this FLIP. However, we are aware of a few potential directions for solving this problem.

  • Make shuffle services adaptively control the amount of memory assigned to each task/slot, with respect to the given memory pool size. In this way, there should be no need to rely on fine-grained resource management to control the network memory consumption.
  • Make shuffle services expose interfaces for calculating network memory requirements for given SSGs. In this way, the Flink runtime can specify the calculated network memory requirements for slots, without having to understand the internal details of different shuffle service implementations.

As for now, we propose in FLINK-20863 to exclude network memory from ResourceProfile for the moment, to unblock the fine-grained resource management feature from the network memory controlling issue. If needed, it can be added back in future, as long as there’s a good way to specify the requirement.

Resource Matching

Currently, ResourceProfile::isMatching uses the following rules (hereinafter, loose matching) to decide whether a slot resource can be used to fulfill the given resource requirement, in both SlotManager and SlotPool

  • An unspecified requirement (ResourceProfile::UNKNOWN) can be fulfilled by any resource.
  • A specified requirement can be fulfilled by any resource that is greater than or equal to itself. Note that this rule is not taking effect since there’s no specified requirement atm.

The loose matching rules were designed before the dynamic slot allocation. Under the assumption that resources of slots are decided when the TM is started and cannot be changed, the loose matching rules have the following advantages.

  • For standalone deployments, it allows slot requests to be fulfilled when the slots of pre-launched TMs can hardly have the exact required resources.
  • For active resource manager deployments, it increases the chance of slots being reused, thus reducing the cost of starting new TMs for various resource requirements.

With dynamic slot allocation introduced in FLIP-56, the benefits of the loose matching rules have been significantly reduced. As slots can be dynamically created after the TMs being started, with any desired resources as long as available, the only benefit the loose matching rules retain is to avoid allocating new slots when the slots can be reused on the JM side, which is insignificant since there’s no need to start new TMs.

On the other hand, the loose matching rules also introduce some problems.

  • Reusing larger slots for fulfilling smaller requirements can harm resource utilization.
  • It’s not straightforward to always find a feasible matching solution (assuming there is one) when matching a set of requirements and slots, in cases of job failovers or declarative slot allocation protocol.

Image Added

The above figure demonstrates how it could fail to find the feasible matching solution with the loose matching rules. Assuming there are two resource requirements A and B, and there are two slots X and Y. The number below each Requirement/Slot represents the amount of resource. Then A can be fulfilled with X and Y, while B can only be fulfilled with Y. A feasible matching is shown on the left, where both requirements can be fulfilled. However, the loose matching rules can also result in another matching, shown on the right, where A is fulfilled by Y, leaving B and X unmatched.

Given the reduction of its benefits and the problems it introduced, we proposed in FLINK-20864 to replace the loose matching rules with the following exact matching rules.

  • An unspecified requirement (ResourceProfile::UNKNOWN) can only be fulfilled by a TM's default slot resource.
  • A specified requirement can only be fulfilled by a resource that is equal to itself.

Resource Deadlock

Image Added

The above figure demonstrates a potential case of deadlock due to scheduling dependency. For the given topology, initially the scheduler will request 4 slots, for A, B, C and D. Assuming only 2 slots are available, if both slots are assigned to Pipeline Region 0 (as shown on the left), A and B will first finish execution, then C and D will be executed, and finally E will be executed. However, if in the beginning the 2 slots are assigned to A and C (as shown on the right), then neither of A and C can finish execution due to missing B and D consuming the data they produced.

Currently, with coarse-grained resource management, the scheduler guarantees to always finish fulfilling requirements of one pipeline region before starting to fulfill requirements of another. That means the deadlock case shown on the right of the above figure can never happen.

However, there’s no such guarantee in fine-grained resource management. Since resource requirements for SSGs can be different, there’s no control on which requirements will be fulfilled first, when there are not enough resources to fulfill all the requirements. Therefore, it’s not always possible to fulfill one pipeline region prior to another.

To solve this problem, FLINK-20865 proposes to make the scheduler defer requesting slots for other SSGs before requirements of the current SSG are fulfilled, for fine-grained resource management, at the price of more scheduling time.

Reactive Scheduling

We are aware that fine-grained resource management may not easily work with reactive scheduling, a future feature that is still in planning, that decides the parallelism of execution based on the available resources (as mentioned in FLIP-138).

For fine-grained resource management to work with reactive scheduling, an important open question is which resource requirements should be fulfilled first when there’s not enough resource to fulfill all of them.

Image Added

The above figure shows a target execution plan on the left that requires 4 slots for each of A and B. On the right, there are 3 possible cases that not all the resource requirements can be fulfilled.

  • In Case 1, we get roughly half of the target processing capacity.
  • In Case 2, we may only get about 1/4 of the target processing capacity, bottlenecked by B.
  • In Case 3, the job cannot be executed at all.

As we can see, how resource requirements are fulfilled with insufficient resources can significantly affect Flink’s performance, and even availability. It could become more complicated when it comes to more complex target execution plans, with heterogeneous target parallelism and scheduling dependencies.

As the first step, we do not support reactive scheduling for fine-grained resource management.

In the future, the problem can potentially be resolved along with the following directions.

  • The scheduler may declare a pair of minimum/target required number of slots for each slot resource. In this way, we should always try to allocate the minimum set of resources for executing the job. This should help avoid the worst case (Case 3 in the above example) when it’s possible.
  • We may also rely on the scheduler to detect the non-optimal cases (Case 2 & 3 in the above example), and to adjust the declared resource requirements and return unnecessary resources.

Future Plan

Potential follow-up issues

In the discussion, we found that there are some potential issues for fine-grained resource management. The issues recorded below will be done in the future when we collect enough feedbacks from user to prove it is valuable.

Operator-level resource configuration interface

We might provide operator-level resource configuration interfaces for expert users who know the exact resource usage of each operator and also want to control the scheduling strategy in a finer grain.

Default operator resource configuration

In this FLIP, the SSGs with unknown resource requirements will be scheduled with the resource profile of the default slot. We might allow user to configure the default operator resource and schedule those SSGs according to the number of operators inside it.

Known limitations and constraints to be documented

When the fine-grained resource management feature is provided to user, the following limitations and constraints should be well documented, with potential impacts and suggestions.

  • Setting chain-able operators to different SSGs may break operator chaining, thus change the performance.
  • Change of data exchange mode (pipelined vs. blocking) with in a SSG may affect the resource requirement of the group.
  • Parallelism differences between operators in the same SSG may reduce the resource utilization.