Status

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Fine-Grained Resource Management is one of Apache Flink’s roadmap features that the community has been trying to deliver. While FLIP-56 delivers the ability to allocate slots with respect to fine-grained resource requirements, the question of how to get those resource requirements remains unanswered. In this FLIP, we will discuss how the runtime interfaces for fine-grained resource requirements should look like, with respect to usability, flexibility and how resources are used in runtime.

Note: This FLIP mainly focuses on discussing and reasoning the design choices. The changes needed for the proposed design are straightforward.

Background on Fine-Grained Resource Management

Motivation

Flink currently adopts a coarse-grained resource management approach, where tasks are deployed into predefined, usually identical slots without the notion of how many resources each slot contains. With slot sharing, tasks in the same Slot Sharing Group (SSG) can be deployed into one slot regardless of how many resources each task/operator needs. In FLIP-56, we proposed fine-grained resource management, which leverages slots with different resources for task execution, with respect to the workload’s resource requirements.

For many jobs, using coarse-grained resource management and simply putting all tasks into one SSG works good enough, in terms of both resource utilization and usability.

  • For many streaming jobs that all tasks have the same parallelism, each slot will contain an entire pipeline. Ideally, all pipelines should use roughly the same resources, which can be satisfied easily by tuning the resources of the identical slots.
  • Resource consumption of tasks varies over time. When consumption of a task decreases, the extra resources can be used by another task whose consumption is increasing. This, known as the peak shaving and valley filling effect, reduces the overall resource needed.

However, there are cases where coarse-grained resource management does not work well.

  • Tasks may have different parallelisms. Sometimes, such different parallelisms cannot be avoided. E.g., the parallelism of source/sink/lookup tasks might be constrained by the partitions and IO load of the external upstream/downstream system. In such cases, slots with fewer tasks would need fewer resources than those with the entire pipeline of tasks.
  • Sometimes the resource needed for the entire pipeline might be too much to be put into a single slot/taskmanager. In such cases, the pipeline needs to be split into multiple SSGs, which may not always have the same resource requirement.
  • For batch jobs, not all the tasks can be executed at the same time. Thus, the instantaneous resource requirement of the pipeline changes over time.

Trying to execute all tasks with identical slots can result in non-optimal resource utilization. The resource of the identical slots has to be able to fulfill the highest resource requirement, which will be wasteful for other requirements. When expensive external resources like GPU are involved, such waste can become even harder to afford.

Therefore, fine-grained resource management is needed, which leverages slots of different resources to improve resource utilization in such scenarios.

Current Condition

Currently, most of the slot allocation and scheduling logic proposed in FLIP-56 has already been implemented, except for a slot manager plugin which is still in progress (FLINK-20835). The major missing part is user interfaces for specifying resource requirements for a job. 

There are some ancient codes for setting operator resources on Transformation and aggregating them to generate slot requests. However, these codes are never really used and there are no APIs exposed to users. Most importantly, we are not sure letting users specify the operator level resource requirements and aggregating them at runtime is the right approach, which will be discussed in subsequence sections.

Scope

This FLIP proposes Slot Sharing Group (SSG) based runtime interfaces for specifying fine-grained resource requirements. To be specific, we discuss how resource requirements are specified at the Transformation layer and leveraged afterward, which covers the common path of both Table/SQL API and DataStream API workloads.

The end-user interfaces for specifying resource requirements are excluded from the scope of this FLIP, for the following reasons.

  • The fine-grained resource management is not end-to-end ready. We believe it should be the last step to activate the feature by exposing the user APIs.
  • Different development APIs may expose the interfaces for specifying resource requirements differently. It requires more in-depth discussions with the component experts to decide how this feature should be integrated by the development APIs. The following examples are only some preliminary ideas for demonstrating how the user interfaces may look like in different development APIs.
    • For DataStream API, there are already interfaces for setting SSGs for operators. Based on this, we can introduce new interfaces to specify resource requirements for the SSGs directly.
    • For Table API & SQL, since neither the concept of operator nor SSG is exposed, the planner probably should generate the SSG resource requirements, exposing only a few configuration knobs to users.

Granularity of Fine-Grained Resource Requirements

In this section, we discuss at what granularity should the fine-grained resource requirements be specified, which is the most fundamental question that needs to be answered for designing the runtime interfaces.

To be specific, we discuss the pros and cons of the three design options: specifying resource requirements for each operator, task or slot sharing group.

User Story

Before diving into the design options, we’d like to make clear the user story of fine-grained resource management, which helps to understand how the pros and cons of each design option affect our target use cases.

We believe fine-grained resource management is not a replacement of existing approaches, but an extension to the span of user involvement for controlling Flink’s resource usage. Users may choose how much they’d like to involve, with respect to their expertise and requirements.

  • The least involved option is to leverage the out-of-box coarse-grained resource configurations. It should work in most simple use cases, especially for beginners trying out Flink. However, resource utilization is usually not optimal.
  • In production, it usually requires more user involvement, to specify the operator parallelisms, configure coarse-grained slot/taskmanager resources, and split slot sharing groups. 
  • For cases that coarse-grained resource management does not work well (as discussed in Motivation section), fine-grained resource management provides a way for expert users to further optimize the resource utilization, by controlling how many resources each certain part of the pipeline should use, at the price of more user involvement.

Operator Granularity

If fine-grained resource requirements are specified for each operator, then the Flink runtime needs to aggregate these resource requirements to generate slot resource requirements, with respect to how operators are chained and how tasks share slots.

The advantages are:

  • Decoupling between resource requirements and operator chaining / slot sharing. Operator resource requirements are independent from how operators are chained and how tasks share slots. Ideally, changing of operator chaining and slot sharing should not require users to re-specify resource requirements.
  • Potential optimization against parallelism differences. For SSGs with operators of different parallelisms, there’s an opportunity to fulfill the slot request with resources exactly required by operators in the slot, which may further improve the resource utilization at the price of further efforts on Flink’s runtime.

However, there are also some drawbacks:

  • Too much user involvement. Complex jobs may contain tens or even hundreds of operators. Specifying resource requirements for each operator is impractical.
  • Hard to support hybrid resource requirements.
    • Hybrid resource requirements: Sometimes users may only want to specify resource requirements for some parts of the job, leaving the requirements for the rest unspecified and expecting them to use similar resources as in coarse-grained resource management.
    • Supporting hybrid operator resource requirements is hard because operators with specified and unspecified resource requirements might be chained together or shared the same slot. It’s hard to define what resources are required by a slot from aggregating specified and unspecified operator requirements.
  • Accumulative configuration error. There’s always a deviation between the configured resource requirements and the actual needed resources. The more operators, the larger the accumulative error from such deviations, which may harm the resource utilization.

Note: Having a default value for operator resource requirements might help reduce the user involvement. However, it can also be hard, sometimes even impossible, to figure out a proper default value. An improper default value could also amplify the accumulative configuration error.

Task Granularity

If fine-grained resource requirements are specified for each task, then the Flink runtime needs to expose how operators are chained into tasks, and aggregate the task resource requirements to generate slot resource requirements with respect to how tasks share slots.

The advantages and disadvantages of the task granularity approach are similar to the operator granularity approach, except for the following differences.

  • Tasks are chained operators, thus resource requirements are no longer decoupled from operator chaining.
  • There are fewer tasks than operators, thus less but still too much user involvement and accumulative configuration error.
  • Expose operator chaining. While DataStream API provides interfaces to hint how operators should be chained, the complete operator chaining strategies remain internal to Flink’s runtime. Exposing how operators will be chained in runtime means setting significant limitations on the involving the chaining strategies, because any new change that leads to different chaining results can break the backward compatibility of resource configurations.

Slot Sharing Group Granularity

With fine-grained resource requirements specified for each SSG, the Flink runtime can directly request slots with the required resources.

It overcomes the shortcomings of the operator/task granularity approaches.

  • Flexible user involvement. How many user efforts are needed depends on how many SSGs are defined by the user. The more SSGs, the more resource requirements need to be specified.
  • Support hybrid resource requirements. Since requirements are on SSGs, there’s no need to worry about operators/tasks with hybrid requirements being chained together or sharing the same slot. FLIP-56 already supported allocating slots with hybrid requirements from the same TM. A slot request with unspecified requirements will be fulfilled by equivalent resources as in coarse-grained resource management.
  • Less accumulative configuration error. The peak shaving and valley filling effect (aforementioned in the Motivation section) between operators within the same SSG should reduce the overall resource needed.

Additionally, the SSG-based approach also helps simplify the system.

  • Specified resource requirements can be directly used for slot allocation. No further process/conversion/aggregation needed.
  • Trying to carefully decide how many resources (CPU, heap memory, etc.) each operator/task should use will not take effect in runtime execution, because there’s no resource isolation between operators/tasks within a slot, except for managed memory.
  • For managed memory, FLIP-53 & FLIP-141 have already introduced a fraction-based approach for sharing managed memory within a slot. Exposing more knobs for controlling the operator/task memory is likely to break the existing approach, or at least complicates the system.

Compared to the operator/task granularity approaches, this approach has the following disadvantages.

  • Coupling between resource requirements and operator chaining / slot sharing. If the SSGs are changed, either explicitly specified by users or due to changes of operator chaining / slot sharing strategies, the specified resource requirements also need to be adjusted.
  • User involvement against parallelism differences. For SSGs with operators of different parallelisms, the slots that do not contain subtasks of all the operators may have more resources than needed. To improve resource utilization against this issue, users would need to separate operators with different parallelisms into different SSGs.

Summary


Granularity

Pros

Cons

Operator

Decoupling between resource requirements and operator chaining / slot sharing.

Potential optimization against parallelism differences.

Too much user involvement.

Hard to support hybrid resource requirements.

Accumulative configuration error.

Task

Decoupling between resource requirements and slot sharing.

Potential optimization against parallelism differences.

Less user involvement and accumulative configuration error compared to the operator granularity.

Hard to support hybrid resource requirements.

Still too much user involvement and accumulative configuration error.

Expose operator chaining.

Slot Sharing Group

Flexible user involvement.

Support hybrid resource requirements.

Less accumulative configuration error.

Simplify the system.

Coupling between resource requirements and operator chaining / slot sharing.

User involvement against parallelism differences.

The above table summarizes the advantages and disadvantages of the three design options.

Through the above pros & cons, we see an important underlying fact, which is also the most convincing reason for us to choose the SSG-based approach, that slot is the basic unit for resource management in Flink’s runtime.

  • Granularity of resource requirements should correspond to how they are fulfilled in runtime. Conversion to slot resource requirements from any other granularity, as required for slot allocation, will add the system complexity.
  • Runtime interfaces should only require the minimum set of information needed for resource management, leaving more flexibility to the development APIs. It’s more straightforward for the development APIs to aggregate user-provided operator/task requirements (if that’s what is exposed to the end-users) to slot requirements, than to make up operator/task requirements from user-provided slot requirements.

To sum up, in this FLIP we propose the SSG-based runtime interfaces for configuring fine-grained resource requirements, for its corresponding to how resources are managed in runtime, thus usability, efficiency, and simplicity. Compared to the benefits, we believe the shortcomings are less impactive: operator chaining and slot sharing strategies do not frequently change in a way that affects the resource requirements, and the user involvement against parallelism differences is a trade-off between usability and resource utilization for the user to decide.

Proposed Changes

The changes proposed in this FLIP are quite straightforward.

  • Introduce runtime interfaces for specifying SSG-based resource requirements.
  • Allocate slots with the specified resource requirements.

Runtime Interfaces

As the entrypoint of the unified runtime, StreamGraphGenerator takes Transformations and various settings from user development APIs, and generates StreamGraph accordingly.

We propose to add the following interface for specifying fine-grained resource requirements for SSGs.

StreamGraphGenerator
public class StreamGraphGenerator {
    /**
     * Specify fine-grained resource requirements for slot sharing groups.
     *
     * <p>Note that a slot sharing group hints the scheduler that the grouped operators CAN be
     * deployed into a shared slot. There's no guarantee that the scheduler always deploy the
     * grouped operator together. In cases grouped operators are deployed into separate slots,
     * the slot resources will be derived from the specified group requirement.
     */
    public StreamGraphGenerator setSlotSharingGroupResource(Map<String, ResourceProfile> slotSharingGroupResources);
}

The specified SSG resource requirements need to be passed on all the way to the corresponding SlotSharingGroup in ExecutionGraph.

Slot Allocation

Currently, slot requests for SSGs are generated by SlotSharingExecutionSlotAllocator. We propose to make SlotSharingExecutionSlotAllocator use the resource requirements in corresponding SlotSharingGroups for generating the slot requests.

Related Issues

Network Memory

Network memory is included in the current ResourceProfile implementation, expecting the fine-grained resource management to not deploy too many tasks onto a TM that require more network memory than the TM contains.

However, how much network memory each task needs highly depends on the shuffle service implementation, and may vary when switching to another shuffle service. Therefore, neither user nor the Flink runtime can easily specify network memory requirements for a task/slot at the moment.

The concrete solution for network memory controlling is beyond the scope of this FLIP. However, we are aware of a few potential directions for solving this problem.

  • Make shuffle services adaptively control the amount of memory assigned to each task/slot, with respect to the given memory pool size. In this way, there should be no need to rely on fine-grained resource management to control the network memory consumption.
  • Make shuffle services expose interfaces for calculating network memory requirements for given SSGs. In this way, the Flink runtime can specify the calculated network memory requirements for slots, without having to understand the internal details of different shuffle service implementations.

As for now, we propose in FLINK-20863 to exclude network memory from ResourceProfile for the moment, to unblock the fine-grained resource management feature from the network memory controlling issue. If needed, it can be added back in future, as long as there’s a good way to specify the requirement.

Resource Matching

Currently, ResourceProfile::isMatching uses the following rules (hereinafter, loose matching) to decide whether a slot resource can be used to fulfill the given resource requirement, in both SlotManager and SlotPool

  • An unspecified requirement (ResourceProfile::UNKNOWN) can be fulfilled by any resource.
  • A specified requirement can be fulfilled by any resource that is greater than or equal to itself. Note that this rule is not taking effect since there’s no specified requirement atm.

The loose matching rules were designed before the dynamic slot allocation. Under the assumption that resources of slots are decided when the TM is started and cannot be changed, the loose matching rules have the following advantages.

  • For standalone deployments, it allows slot requests to be fulfilled when the slots of pre-launched TMs can hardly have the exact required resources.
  • For active resource manager deployments, it increases the chance of slots being reused, thus reducing the cost of starting new TMs for various resource requirements.

With dynamic slot allocation introduced in FLIP-56, the benefits of the loose matching rules have been significantly reduced. As slots can be dynamically created after the TMs being started, with any desired resources as long as available, the only benefit the loose matching rules retain is to avoid allocating new slots when the slots can be reused on the JM side, which is insignificant since there’s no need to start new TMs.

On the other hand, the loose matching rules also introduce some problems.

  • Reusing larger slots for fulfilling smaller requirements can harm resource utilization.
  • It’s not straightforward to always find a feasible matching solution (assuming there is one) when matching a set of requirements and slots, in cases of job failovers or declarative slot allocation protocol.

The above figure demonstrates how it could fail to find the feasible matching solution with the loose matching rules. Assuming there are two resource requirements A and B, and there are two slots X and Y. The number below each Requirement/Slot represents the amount of resource. Then A can be fulfilled with X and Y, while B can only be fulfilled with Y. A feasible matching is shown on the left, where both requirements can be fulfilled. However, the loose matching rules can also result in another matching, shown on the right, where A is fulfilled by Y, leaving B and X unmatched.

Given the reduction of its benefits and the problems it introduced, we proposed in FLINK-20864 to replace the loose matching rules with the following exact matching rules.

  • An unspecified requirement (ResourceProfile::UNKNOWN) can only be fulfilled by a TM's default slot resource.
  • A specified requirement can only be fulfilled by a resource that is equal to itself.

Resource Deadlock

The above figure demonstrates a potential case of deadlock due to scheduling dependency. For the given topology, initially the scheduler will request 4 slots, for A, B, C and D. Assuming only 2 slots are available, if both slots are assigned to Pipeline Region 0 (as shown on the left), A and B will first finish execution, then C and D will be executed, and finally E will be executed. However, if in the beginning the 2 slots are assigned to A and C (as shown on the right), then neither of A and C can finish execution due to missing B and D consuming the data they produced.

Currently, with coarse-grained resource management, the scheduler guarantees to always finish fulfilling requirements of one pipeline region before starting to fulfill requirements of another. That means the deadlock case shown on the right of the above figure can never happen.

However, there’s no such guarantee in fine-grained resource management. Since resource requirements for SSGs can be different, there’s no control on which requirements will be fulfilled first, when there are not enough resources to fulfill all the requirements. Therefore, it’s not always possible to fulfill one pipeline region prior to another.

To solve this problem, FLINK-20865 proposes to make the scheduler defer requesting slots for other SSGs before requirements of the current SSG are fulfilled, for fine-grained resource management, at the price of more scheduling time.

Reactive Scheduling

We are aware that fine-grained resource management may not easily work with reactive scheduling, a future feature that is still in planning, that decides the parallelism of execution based on the available resources (as mentioned in FLIP-138).

For fine-grained resource management to work with reactive scheduling, an important open question is which resource requirements should be fulfilled first when there’s not enough resource to fulfill all of them.

The above figure shows a target execution plan on the left that requires 4 slots for each of A and B. On the right, there are 3 possible cases that not all the resource requirements can be fulfilled.

  • In Case 1, we get roughly half of the target processing capacity.
  • In Case 2, we may only get about 1/4 of the target processing capacity, bottlenecked by B.
  • In Case 3, the job cannot be executed at all.

As we can see, how resource requirements are fulfilled with insufficient resources can significantly affect Flink’s performance, and even availability. It could become more complicated when it comes to more complex target execution plans, with heterogeneous target parallelism and scheduling dependencies.

As the first step, we do not support reactive scheduling for fine-grained resource management.

In the future, the problem can potentially be resolved along with the following directions.

  • The scheduler may declare a pair of minimum/target required number of slots for each slot resource. In this way, we should always try to allocate the minimum set of resources for executing the job. This should help avoid the worst case (Case 3 in the above example) when it’s possible.
  • We may also rely on the scheduler to detect the non-optimal cases (Case 2 & 3 in the above example), and to adjust the declared resource requirements and return unnecessary resources.

Future Plan

Potential follow-up issues

In the discussion, we found that there are some potential issues for fine-grained resource management. The issues recorded below will be done in the future when we collect enough feedbacks from user to prove it is valuable.

Operator-level resource configuration interface

We might provide operator-level resource configuration interfaces for expert users who know the exact resource usage of each operator and also want to control the scheduling strategy in a finer grain.

Default operator resource configuration

In this FLIP, the SSGs with unknown resource requirements will be scheduled with the resource profile of the default slot. We might allow user to configure the default operator resource and schedule those SSGs according to the number of operators inside it.

Known limitations and constraints to be documented

When the fine-grained resource management feature is provided to user, the following limitations and constraints should be well documented, with potential impacts and suggestions.

  • Setting chain-able operators to different SSGs may break operator chaining, thus change the performance.
  • Change of data exchange mode (pipelined vs. blocking) with in a SSG may affect the resource requirement of the group.
  • Parallelism differences between operators in the same SSG may reduce the resource utilization.