Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • The least involved option is to leverage the out-of-box coarse-grained resource configurations. It should work in most simple use cases, especially for beginners trying out Flink. However, resource utilization is usually not optimal.
  • In production, it usually requires more user involvement, to specify the operator parallelisms, configure coarse-grained slot/taskmanager resources, and split slot sharing groups. 
  • For cases that coarse-grained resource management does not work well (as discussed abovein Motivation section), fine-grained resource management provides a way for expert users to further optimize the resource utilization, by controlling how many resources each certain part of the pipeline should use, at the price of more user involvement.

Operator Granularity

If fine-grained resource requirements are specified for each operator, then the Flink runtime needs to aggregate these resource requirements to generate slot resource requirements, with respect to how operators are chained and how tasks share slots.

...

The advantages are:

  • Decoupling between resource requirements and operator chaining / slot sharing. Operator resource requirements are independent from how operators are chained and how tasks share slots. Ideally, changing of operator chaining and slot sharing should not require users to re-specify resource requirements.
  • Potential optimization against parallelism differences. For SSGs with operators of different parallelisms, there’s an opportunity to fulfill the slot request with resources exactly required by operators in the slot, which may further improve the resource utilization at the price of further efforts on Flink’s runtime.

However, there are also some drawbacks:

  • Too much user involvement. Complex jobs may contain tens or even hundreds of operators. Specifying resource requirements for each operator is impractical.
  • Hard to support hybrid resource requirements.
    • Hybrid resource requirements: Sometimes users may only want to specify resource requirements for some parts of the job, leaving the requirements for the rest unspecified and expecting them to use similar resources as in coarse-grained resource management.
    • Supporting hybrid operator resource requirements is hard because operators with specified and unspecified resource requirements might be chained together or shared the same slot. It’s hard to define what resources are required by a slot from aggregating specified and unspecified operator requirements.
  • Accumulative configuration error. There’s always a deviation between the configured resource requirements and the actual needed resources. The more operators, the larger the accumulative error from such deviations, which may harm the resource utilization.

Note: Having a default value for operator resource requirements might help reduce the user involvement. However, it can also be hard, sometimes even impossible, to figure out a proper default value. An improper default value could also amplify the accumulative configuration error.

Task Granularity

If fine-grained resource requirements are specified for each task, then the Flink runtime needs to expose how operators are chained into tasks, and aggregate the task resource requirements to generate slot resource requirements with respect to how tasks share slots.

The advantages and disadvantages of the task granularity approach are similar to the operator granularity approach, except for the following differences.

  • Tasks are chained operators, thus resource requirements are no longer decoupled from operator chaining.
  • There are fewer tasks than operators, thus less but still too much user involvement and accumulative configuration error.
  • Expose operator chaining. While DataStream API provides interfaces to hint how operators should be chained, the complete operator chaining strategies remain internal to Flink’s runtime. Exposing how operators will be chained in runtime means setting significant limitations on the involving the chaining strategies, because any new change that leads to different chaining results can break the backward compatibility of resource configurations.

Slot Sharing Group Granularity

With fine-grained resource requirements specified for each SSG, the Flink runtime can directly request slots with the required resources.

It overcomes the shortcomings of the operator/task granularity approaches.

  • Flexible user involvement. How many user efforts are needed depends on how many SSGs are defined by the user. The more SSGs, the more resource requirements need to be specified.
  • Support hybrid resource requirements. Since requirements are on SSGs, there’s no need to worry about operators/tasks with hybrid requirements being chained together or sharing the same slot. FLIP-56 already supported allocating slots with hybrid requirements from the same TM. A slot request with unspecified requirements will be fulfilled by equivalent resources as in coarse-grained resource management.
  • Less accumulative configuration error. The peak shaving and valley filling effect (aforementioned in the Motivation section) between operators within the same SSG should reduce the overall resource needed.

Additionally, the SSG-based approach also helps simplify the system.

  • Specified resource requirements can be directly used for slot allocation. No further process/conversion/aggregation needed.
  • Trying to carefully decide how many resources (CPU, heap memory, etc.) each operator/task should use will not take effect in runtime execution, because there’s no resource isolation between operators/tasks within a slot, except for managed memory.
  • For managed memory, FLIP-53 & FLIP-141 have already introduced a fraction-based approach for sharing managed memory within a slot. Exposing more knobs for controlling the operator/task memory is likely to break the existing approach, or at least complicates the system.

Compared to the operator/task granularity approaches, this approach has the following disadvantages.

  • Coupling between resource requirements and operator chaining / slot sharing. If the SSGs are changed, either explicitly specified by users or due to changes of operator chaining / slot sharing strategies, the specified resource requirements also need to be adjusted.
  • User involvement against parallelism differences. For SSGs with operators of different parallelisms, the slots that do not contain subtasks of all the operators may have more resources than needed. To improve resource utilization against this issue, users would need to separate operators with different parallelisms into different SSGs.

Summary


Granularity

Pros

Cons

Operator

Decoupling between resource requirements and operator chaining / slot sharing.

Potential optimization against parallelism differences.

Too much user involvement.

Hard to support hybrid resource requirements.

Accumulative configuration error.

Task

Decoupling between resource requirements and slot sharing.

Potential optimization against parallelism differences.

Less user involvement and accumulative configuration error compared to the operator granularity.

Hard to support hybrid resource requirements.

Still too much user involvement and accumulative configuration error.

Expose operator chaining.

Slot Sharing Group

Flexible user involvement.

Support hybrid resource requirements.

Less accumulative configuration error.

Simplify the system.

Coupling between resource requirements and operator chaining / slot sharing.

User involvement against parallelism differences.

The above table summarizes the advantages and disadvantages of the three design options.

Through the above pros & cons, we see an important underlying fact, which is also the most convincing reason for us to choose the SSG-based approach, that slot is the basic unit for resource management in Flink’s runtime.

  • Granularity of resource requirements should correspond to how they are fulfilled in runtime. Conversion to slot resource requirements from any other granularity, as required for slot allocation, will add the system complexity.
  • Runtime interfaces should only require the minimum set of information needed for resource management, leaving more flexibility to the development APIs. It’s more straightforward for the development APIs to aggregate user-provided operator/task requirements (if that’s what is exposed to the end-users) to slot requirements, than to make up operator/task requirements from user-provided slot requirements.

To sum up, in this FLIP we propose the SSG-based runtime interfaces for configuring fine-grained resource requirements, for its corresponding to how resources are managed in runtime, thus usability, efficiency, and simplicity. Compared to the benefits, we believe the shortcomings are less impactive: operator chaining and slot sharing strategies do not frequently change in a way that affects the resource requirements, and the user involvement against parallelism differences is a trade-off between usability and resource utilization for the user to decide.

Proposed Changes

The changes proposed in this FLIP are quite straightforward.

  • Introduce runtime interfaces for specifying SSG-based resource requirements.
  • Allocate slots with the specified resource requirements.

Runtime Interfaces

As the entrypoint of the unified runtime, StreamGraphGenerator takes Transformations and various settings from user development APIs, and generates StreamGraph accordingly.

We propose to add the following interface for specifying fine-grained resource requirements for SSGs.

Code Block
languagejava
titleRuntimeContext
public class StreamGraphGenerator {
    public StreamGraphGenerator setSlotSharingGroupResource(Map<String, ResourceProfile> slotSharingGroupResources);
}

The specified SSG resource requirements need to be passed on all the way to the corresponding SlotSharingGroup in ExecutionGraph.

Slot Allocation

Currently, slot requests for SSGs are generated by SlotSharingExecutionSlotAllocator. We propose to make SlotSharingExecutionSlotAllocator use the resource requirements in corresponding SlotSharingGroups for generating the slot requests.

Related Issues

Network Memory

Network memory is included in the current ResourceProfile implementation, expecting the fine-grained resource management to not deploy too many tasks onto a TM that require more network memory than the TM contains.

However, how much network memory each task needs highly depends on the shuffle service implementation, and may vary when switching to another shuffle service. Therefore, neither user nor the Flink runtime can easily specify network memory requirements for a task/slot at the moment.

The concrete solution for network memory controlling is beyond the scope of this FLIP. However, we are aware of a few potential directions for solving this problem.

  • Make shuffle services adaptively control the amount of memory assigned to each task/slot, with respect to the given memory pool size. In this way, there should be no need to rely on fine-grained resource management to control the network memory consumption.
  • Make shuffle services expose interfaces for calculating network memory requirements for given SSGs. In this way, the Flink runtime can specify the calculated network memory requirements for slots, without having to understand the internal details of different shuffle service implementations.


As for now, we propose in FLINK-20863 to exclude network memory from ResourceProfile for the moment, to unblock the fine-grained resource management feature from the network memory controlling issue. If needed, it can be added back in future, as long as there’s a good way to specify the requirement.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • DataStream and DataSet API, including classes related to that, such as StreamExecutionEnvironment
  • Classes marked with the @Public annotation
  • On-disk binary formats, such as checkpoints/savepoints
  • User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Mesos scripts
  • Configuration settings
  • Exposed monitoring information

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.