Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

Vote threadhttp

...

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Fine-Grained Resource Management is one of Apache Flink’s roadmap features that the community has been trying to deliver. While FLIP-56 delivers the ability to allocate slots with respect to fine-grained resource requirements, the question of how to get those resource requirements remains unanswered. In this FLIP, we will discuss how the runtime interfaces for fine-grained resource requirements should look like, with respect to usability, flexibility and how resources are used in runtime.

Note: This FLIP mainly focuses on discussing and reasoning the design choices. The changes needed for the proposed design are straightforward.

Background on Fine-Grained Resource Management

Motivation

Flink currently adopts a coarse-grained resource management approach, where tasks are deployed into predefined, usually identical slots without the notion of how many resources each slot contains. With slot sharing, tasks in the same Slot Sharing Group (SSG) can be deployed into one slot regardless of how many resources each task/operator needs. In FLIP-56, we proposed fine-grained resource management, which leverages slots with different resources for task execution, with respect to the workload’s resource requirements.

For many jobs, using coarse-grained resource management and simply putting all tasks into one SSG works good enough, in terms of both resource utilization and usability.

  • For many streaming jobs that all tasks have the same parallelism, each slot will contain an entire pipeline. Ideally, all pipelines should use roughly the same resources, which can be satisfied easily by tuning the resources of the identical slots.
  • Resource consumption of tasks varies over time. When consumption of a task decreases, the extra resources can be used by another task whose consumption is increasing. This, known as the peak shaving and valley filling effect, reduces the overall resource needed.

However, there are cases where coarse-grained resource management does not work well.

  • Tasks may have different parallelisms. Sometimes, such different parallelisms cannot be avoided. E.g., the parallelism of source/sink/lookup tasks might be constrained by the partitions and IO load of the external upstream/downstream system. In such cases, slots with fewer tasks would need fewer resources than those with the entire pipeline of tasks.
  • Sometimes the resource needed for the entire pipeline might be too much to be put into a single slot/taskmanager. In such cases, the pipeline needs to be split into multiple SSGs, which may not always have the same resource requirement.
  • For batch jobs, not all the tasks can be executed at the same time. Thus, the instantaneous resource requirement of the pipeline changes over time.

Trying to execute all tasks with identical slots can result in non-optimal resource utilization. The resource of the identical slots has to be able to fulfill the highest resource requirement,

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Fine-Grained Resource Management is one of Apache Flink’s roadmap features that the community has been trying to deliver. While FLIP-56 delivers the ability to allocate slots with respect to fine-grained resource requirements, the question of how to get those resource requirements remains unanswered. In this FLIP, we will discuss how the runtime interfaces for fine-grained resource requirements should look like, with respect to usability, flexibility and how resources are used in runtime.

Note: This FLIP mainly focuses on discussing and reasoning the design choices. The changes needed for the proposed design are straightforward.

Background on Fine-Grained Resource Management

Motivation

Flink currently adopts a coarse-grained resource management approach, where tasks are deployed into predefined, usually identical slots without the notion of how many resources each slot contains. With slot sharing, tasks in the same Slot Sharing Group (SSG) can be deployed into one slot regardless of how many resources each task/operator needs. In FLIP-56, we proposed fine-grained resource management, which leverages slots with different resources for task execution, with respect to the workload’s resource requirements.

For many jobs, using coarse-grained resource management and simply putting all tasks into one SSG works good enough, in terms of both resource utilization and usability.

  • For many streaming jobs that all tasks have the same parallelism, each slot will contain an entire pipeline. Ideally, all pipelines should use roughly the same resources, which can be satisfied easily by tuning the resources of the identical slots.
  • Resource consumption of tasks varies over time. When consumption of a task decreases, the extra resources can be used by another task whose consumption is increasing. This, known as the peak shaving and valley filling effect, reduces the overall resource needed.

However, there are cases where coarse-grained resource management does not work well.

  • Tasks may have different parallelisms. Sometimes, such different parallelisms cannot be avoided. E.g., the parallelism of source/sink/lookup tasks might be constrained by the partitions and IO load of the external upstream/downstream system. In such cases, slots with fewer tasks would need fewer resources than those with the entire pipeline of tasks.
  • Sometimes the resource needed for the entire pipeline might be too much to be put into a single slot/taskmanager. In such cases, the pipeline needs to be split into multiple SSGs, which may not always have the same resource requirement.
  • For batch jobs, not all the tasks can be executed at the same time. Thus, the instantaneous resource requirement of the pipeline changes over time.

Trying to execute all tasks with identical slots can result in non-optimal resource utilization. The resource of the identical slots has to be able to fulfill the highest resource requirement, which will be wasteful for other requirements. When expensive external resources like GPU are involved, such waste can become even harder to afford.

...

  • Granularity of resource requirements should correspond to how they are fulfilled in runtime. Conversion to slot resource requirements from any other granularity, as required for slot allocation, will add the system complexity.
  • Runtime interfaces should only require the minimum set of information needed for resource management, leaving more flexibility to the development APIs. It’s more straightforward for the development APIs to aggregate user-provided operator/task requirements (if that’s what is exposed to the end-users) to slot requirements, than to make up operator/task requirements from user-provided slot requirements.

To sum up, in this FLIP we propose the SSG-based runtime interfaces for configuring fine-grained resource requirements, for its corresponding to how resources are managed in runtime, thus usability, efficiency, and simplicity. Compared to the benefits, we believe the shortcomings are less impactive: operator chaining and slot sharing strategies do not frequently change in a way that affects the resource requirements, and the user involvement against parallelism differences is a trade-off between usability and resource utilization for the user to decide.

Proposed Changes

The changes proposed in this FLIP are quite straightforward.

  • Introduce runtime interfaces for specifying SSG-based resource requirements.
  • Allocate slots with the specified resource requirements.

Runtime Interfaces

As the entrypoint of the unified runtime, StreamGraphGenerator takes Transformations and various settings from user development APIs, and generates StreamGraph accordingly.

We propose to add the following interface for specifying fine-grained resource requirements for SSGs.

Code Block
languagejava
titleStreamGraphGenerator
public class StreamGraphGenerator {
    public StreamGraphGenerator setSlotSharingGroupResource(Map<String, ResourceProfile> slotSharingGroupResources);
}

The specified SSG resource requirements need to be passed on all the way to the corresponding SlotSharingGroup in ExecutionGraph.

Slot Allocation

Currently, slot requests for SSGs are generated by SlotSharingExecutionSlotAllocator. We propose to make SlotSharingExecutionSlotAllocator use the resource requirements in corresponding SlotSharingGroups for generating the slot requests.

Related Issues

Network Memory

Network memory is included in the current ResourceProfile implementation, expecting the fine-grained resource management to not deploy too many tasks onto a TM that require more network memory than the TM contains.

However, how much network memory each task needs highly depends on the shuffle service implementation, and may vary when switching to another shuffle service. Therefore, neither user nor the Flink runtime can easily specify network memory requirements for a task/slot at the moment.

The concrete solution for network memory controlling is beyond the scope of this FLIP. However, we are aware of a few potential directions for solving this problem.

  • Make shuffle services adaptively control the amount of memory assigned to each task/slot, with respect to the given memory pool size. In this way, there should be no need to rely on fine-grained resource management to control the network memory consumption.
  • Make shuffle services expose interfaces for calculating network memory requirements for given SSGs. In this way, the Flink runtime can specify the calculated network memory requirements for slots, without having to understand the internal details of different shuffle service implementations.

As for now, we propose in FLINK-20863 to exclude network memory from ResourceProfile for the moment, to unblock the fine-grained resource management feature from the network memory controlling issue. If needed, it can be added back in future, as long as there’s a good way to specify the requirement.

Resource Matching

Currently, ResourceProfile::isMatching uses the following rules (hereinafter, loose matching) to decide whether a slot resource can be used to fulfill the given resource requirement, in both SlotManager and SlotPool

  • An unspecified requirement (ResourceProfile::UNKNOWN) can be fulfilled by any resource.
  • A specified requirement can be fulfilled by any resource that is greater than or equal to itself. Note that this rule is not taking effect since there’s no specified requirement atm.

The loose matching rules were designed before the dynamic slot allocation. Under the assumption that resources of slots are decided when the TM is started and cannot be changed, the loose matching rules have the following advantages.

  • For standalone deployments, it allows slot requests to be fulfilled when the slots of pre-launched TMs can hardly have the exact required resources.
  • For active resource manager deployments, it increases the chance of slots being reused, thus reducing the cost of starting new TMs for various resource requirements.

With dynamic slot allocation introduced in FLIP-56, the benefits of the loose matching rules have been significantly reduced. As slots can be dynamically created after the TMs being started, with any desired resources as long as available, the only benefit the loose matching rules retain is to avoid allocating new slots when the slots can be reused on the JM side, which is insignificant since there’s no need to start new TMs.

On the other hand, the loose matching rules also introduce some problems.

  • Reusing larger slots for fulfilling smaller requirements can harm resource utilization.
  • It’s not straightforward to always find a feasible matching solution (assuming there is one) when matching a set of requirements and slots, in cases of job failovers or declarative slot allocation protocol.

Image Removed

The above figure demonstrates how it could fail to find the feasible matching solution with the loose matching rules. Assuming there are two resource requirements A and B, and there are two slots X and Y. The number below each Requirement/Slot represents the amount of resource. Then A can be fulfilled with X and Y, while B can only be fulfilled with Y. A feasible matching is shown on the left, where both requirements can be fulfilled. However, the loose matching rules can also result in another matching, shown on the right, where A is fulfilled by Y, leaving B and X unmatched.

Given the reduction of its benefits and the problems it introduced, we proposed in FLINK-20864 to replace the loose matching rules with the following exact matching rules.

  • An unspecified requirement (ResourceProfile::UNKNOWN) can only be fulfilled by a TM's default slot resource.
  • A specified requirement can only be fulfilled by a resource that is equal to itself.

Resource Deadlock

Image Removed

The above figure demonstrates a potential case of deadlock due to scheduling dependency. For the given topology, initially the scheduler will request 4 slots, for A, B, C and D. Assuming only 2 slots are available, if both slots are assigned to Pipeline Region 0 (as shown on the left), A and B will first finish execution, then C and D will be executed, and finally E will be executed. However, if in the beginning the 2 slots are assigned to A and C (as shown on the right), then neither of A and C can finish execution due to missing B and D consuming the data they produced.

Currently, with coarse-grained resource management, the scheduler guarantees to always finish fulfilling requirements of one pipeline region before starting to fulfill requirements of another. That means the deadlock case shown on the right of the above figure can never happen.

However, there’s no such guarantee in fine-grained resource management. Since resource requirements for SSGs can be different, there’s no control on which requirements will be fulfilled first, when there are not enough resources to fulfill all the requirements. Therefore, it’s not always possible to fulfill one pipeline region prior to another.

To solve this problem, FLINK-20865 proposes to make the scheduler defer requesting slots for other SSGs before requirements of the current SSG are fulfilled, for fine-grained resource management, at the price of more scheduling time.

Reactive Scheduling

We are aware that fine-grained resource management may not easily work with reactive scheduling, a future feature that is still in planning, that decides the parallelism of execution based on the available resources (as mentioned in FLIP-138).

For fine-grained resource management to work with reactive scheduling, an important open question is which resource requirements should be fulfilled first when there’s not enough resource to fulfill all of them.

Image Removed

The above figure shows a target execution plan on the left that requires 4 slots for each of A and B. On the right, there are 3 possible cases that not all the resource requirements can be fulfilled.

  • In Case 1, we get roughly half of the target processing capacity.
  • In Case 2, we may only get about 1/4 of the target processing capacity, bottlenecked by B.
  • In Case 3, the job cannot be executed at all.

As we can see, how resource requirements are fulfilled with insufficient resources can significantly affect Flink’s performance, and even availability. It could become more complicated when it comes to more complex target execution plans, with heterogeneous target parallelism and scheduling dependencies.

As the first step, we do not support reactive scheduling for fine-grained resource management.

In the future, the problem can potentially be resolved along with the following directions.

...

  • how they are fulfilled in runtime. Conversion to slot resource requirements from any other granularity, as required for slot allocation, will add the system complexity.
  • Runtime interfaces should only require the minimum set of information needed for resource management, leaving more flexibility to the development APIs. It’s more straightforward for the development APIs to aggregate user-provided operator/task requirements (if that’s what is exposed to the end-users) to slot requirements, than to make up operator/task requirements from user-provided slot requirements.

To sum up, in this FLIP we propose the SSG-based runtime interfaces for configuring fine-grained resource requirements, for its corresponding to how resources are managed in runtime, thus usability, efficiency, and simplicity. Compared to the benefits, we believe the shortcomings are less impactive: operator chaining and slot sharing strategies do not frequently change in a way that affects the resource requirements, and the user involvement against parallelism differences is a trade-off between usability and resource utilization for the user to decide.

Proposed Changes

The changes proposed in this FLIP are quite straightforward.

  • Introduce runtime interfaces for specifying SSG-based resource requirements.
  • Allocate slots with the specified resource requirements.

Runtime Interfaces

As the entrypoint of the unified runtime, StreamGraphGenerator takes Transformations and various settings from user development APIs, and generates StreamGraph accordingly.

We propose to add the following interface for specifying fine-grained resource requirements for SSGs.

Code Block
languagejava
titleStreamGraphGenerator
public class StreamGraphGenerator {
    /**
     * Specify fine-grained resource requirements for slot sharing groups.
     *
     * <p>Note that a slot sharing group hints the scheduler that the grouped operators CAN be
     * deployed into a shared slot. There's no guarantee that the scheduler always deploy the
     * grouped operator together. In cases grouped operators are deployed into separate slots,
     * the slot resources will be derived from the specified group requirement.
     */
    public StreamGraphGenerator setSlotSharingGroupResource(Map<String, ResourceProfile> slotSharingGroupResources);
}

The specified SSG resource requirements need to be passed on all the way to the corresponding SlotSharingGroup in ExecutionGraph.

Slot Allocation

Currently, slot requests for SSGs are generated by SlotSharingExecutionSlotAllocator. We propose to make SlotSharingExecutionSlotAllocator use the resource requirements in corresponding SlotSharingGroups for generating the slot requests.

Related Issues

Network Memory

Network memory is included in the current ResourceProfile implementation, expecting the fine-grained resource management to not deploy too many tasks onto a TM that require more network memory than the TM contains.

However, how much network memory each task needs highly depends on the shuffle service implementation, and may vary when switching to another shuffle service. Therefore, neither user nor the Flink runtime can easily specify network memory requirements for a task/slot at the moment.

The concrete solution for network memory controlling is beyond the scope of this FLIP. However, we are aware of a few potential directions for solving this problem.

  • Make shuffle services adaptively control the amount of memory assigned to each task/slot, with respect to the given memory pool size. In this way, there should be no need to rely on fine-grained resource management to control the network memory consumption.
  • Make shuffle services expose interfaces for calculating network memory requirements for given SSGs. In this way, the Flink runtime can specify the calculated network memory requirements for slots, without having to understand the internal details of different shuffle service implementations.

As for now, we propose in FLINK-20863 to exclude network memory from ResourceProfile for the moment, to unblock the fine-grained resource management feature from the network memory controlling issue. If needed, it can be added back in future, as long as there’s a good way to specify the requirement.

Resource Matching

Currently, ResourceProfile::isMatching uses the following rules (hereinafter, loose matching) to decide whether a slot resource can be used to fulfill the given resource requirement, in both SlotManager and SlotPool

  • An unspecified requirement (ResourceProfile::UNKNOWN) can be fulfilled by any resource.
  • A specified requirement can be fulfilled by any resource that is greater than or equal to itself. Note that this rule is not taking effect since there’s no specified requirement atm.

The loose matching rules were designed before the dynamic slot allocation. Under the assumption that resources of slots are decided when the TM is started and cannot be changed, the loose matching rules have the following advantages.

  • For standalone deployments, it allows slot requests to be fulfilled when the slots of pre-launched TMs can hardly have the exact required resources.
  • For active resource manager deployments, it increases the chance of slots being reused, thus reducing the cost of starting new TMs for various resource requirements.

With dynamic slot allocation introduced in FLIP-56, the benefits of the loose matching rules have been significantly reduced. As slots can be dynamically created after the TMs being started, with any desired resources as long as available, the only benefit the loose matching rules retain is to avoid allocating new slots when the slots can be reused on the JM side, which is insignificant since there’s no need to start new TMs.

On the other hand, the loose matching rules also introduce some problems.

  • Reusing larger slots for fulfilling smaller requirements can harm resource utilization.
  • It’s not straightforward to always find a feasible matching solution (assuming there is one) when matching a set of requirements and slots, in cases of job failovers or declarative slot allocation protocol.

Image Added

The above figure demonstrates how it could fail to find the feasible matching solution with the loose matching rules. Assuming there are two resource requirements A and B, and there are two slots X and Y. The number below each Requirement/Slot represents the amount of resource. Then A can be fulfilled with X and Y, while B can only be fulfilled with Y. A feasible matching is shown on the left, where both requirements can be fulfilled. However, the loose matching rules can also result in another matching, shown on the right, where A is fulfilled by Y, leaving B and X unmatched.

Given the reduction of its benefits and the problems it introduced, we proposed in FLINK-20864 to replace the loose matching rules with the following exact matching rules.

  • An unspecified requirement (ResourceProfile::UNKNOWN) can only be fulfilled by a TM's default slot resource.
  • A specified requirement can only be fulfilled by a resource that is equal to itself.

Resource Deadlock

Image Added

The above figure demonstrates a potential case of deadlock due to scheduling dependency. For the given topology, initially the scheduler will request 4 slots, for A, B, C and D. Assuming only 2 slots are available, if both slots are assigned to Pipeline Region 0 (as shown on the left), A and B will first finish execution, then C and D will be executed, and finally E will be executed. However, if in the beginning the 2 slots are assigned to A and C (as shown on the right), then neither of A and C can finish execution due to missing B and D consuming the data they produced.

Currently, with coarse-grained resource management, the scheduler guarantees to always finish fulfilling requirements of one pipeline region before starting to fulfill requirements of another. That means the deadlock case shown on the right of the above figure can never happen.

However, there’s no such guarantee in fine-grained resource management. Since resource requirements for SSGs can be different, there’s no control on which requirements will be fulfilled first, when there are not enough resources to fulfill all the requirements. Therefore, it’s not always possible to fulfill one pipeline region prior to another.

To solve this problem, FLINK-20865 proposes to make the scheduler defer requesting slots for other SSGs before requirements of the current SSG are fulfilled, for fine-grained resource management, at the price of more scheduling time.

Reactive Scheduling

We are aware that fine-grained resource management may not easily work with reactive scheduling, a future feature that is still in planning, that decides the parallelism of execution based on the available resources (as mentioned in FLIP-138).

For fine-grained resource management to work with reactive scheduling, an important open question is which resource requirements should be fulfilled first when there’s not enough resource to fulfill all of them.

Image Added

The above figure shows a target execution plan on the left that requires 4 slots for each of A and B. On the right, there are 3 possible cases that not all the resource requirements can be fulfilled.

  • In Case 1, we get roughly half of the target processing capacity.
  • In Case 2, we may only get about 1/4 of the target processing capacity, bottlenecked by B.
  • In Case 3, the job cannot be executed at all.

As we can see, how resource requirements are fulfilled with insufficient resources can significantly affect Flink’s performance, and even availability. It could become more complicated when it comes to more complex target execution plans, with heterogeneous target parallelism and scheduling dependencies.

As the first step, we do not support reactive scheduling for fine-grained resource management.

In the future, the problem can potentially be resolved along with the following directions.

  • The scheduler may declare a pair of minimum/target required number of slots for each slot resource. In this way, we should always try to allocate the minimum set of resources for executing the job. This should help avoid the worst case (Case 3 in the above example) when it’s possible.
  • We may also rely on the scheduler to detect the non-optimal cases (Case 2 & 3 in the above example), and to adjust the declared resource requirements and return unnecessary resources.

Future Plan

Potential follow-up issues

In the discussion, we found that there are some potential issues for fine-grained resource management. The issues recorded below will be done in the future when we collect enough feedbacks from user to prove it is valuable.

Operator-level resource configuration interface

We might provide operator-level resource configuration interfaces for expert users who know the exact resource usage of each operator and also want to control the scheduling strategy in a finer grain.

Default operator resource configuration

In this FLIP, the SSGs with unknown resource requirements will be scheduled with the resource profile of the default slot. We might allow user to configure the default operator resource and schedule those SSGs according to the number of operators inside it.

Known limitations and constraints to be documented

When the fine-grained resource management feature is provided to user, the following limitations and constraints should be well documented, with potential impacts and suggestions.

  • Setting chain-able operators to different SSGs may break operator chaining, thus change the performance.
  • Change of data exchange mode (pipelined vs. blocking) with in a SSG may affect the resource requirement of the group.
  • Parallelism differences between operators in the same SSG may reduce the resource utilization.