Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion thread

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-53-Fine-Grained-Resource-Management-td31831.html

JIRA:


Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14058

Release1.10

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently (Flink 1.9), Flink adopts a coarse grained resource management approach, where tasks are deployed into as many as the job’s max parallelism of predefined slots, regardless of how much resource each task / operator can use.

The current approach is easy to set up, but may not have optimal performance and resource utilityutilisation

  • Tasks may have different parallelisms, thus not all of the slots contains an entire pipeline of tasks. For slots with less fewer tasks, the slot resource predefined for an entire pipeline might be wasteful.
  • It could be hard to align slot resource with tasks requirements in all resource aspects (heap, network, managed, etc.). 

In this FLIP, we We propose fine grained resource management, which optimizes resource utility optimises resource utilisation in conditions where resource requirements of individual tasks are known or can be tuned.

...

  • It works well for both Streaming and Batch jobs.
  • It works well whether tasks’ resource requirements are specified or unknown.

This FLIP focuses on the operator resource management aspect of fine grained resource management.

  • We propose fraction based managed memory quota to unify the memory management for both operators with specified and unknown resource requirements.
  • We propose to narrow slot sharing groups down to pipelined regions within which tasks may run concurrently, so we can derive reasonable but not too conservative fractions for tasks / operators to share memory in the same slot.

Scope

  • The approach proposed by this FLIP should only apply to jobs of DataStream API and SQL/Table API by the Blink planner (unbounded streaming and bounded batch jobs). It should not affect jobs of DataSet API.
    • For DataSet jobs, there are already some fraction based approach (in TaskConfig and ChainedDriver), and we do not make any change to the existing approach. 
  • This FLIP assumes that for jobs with known operators' resource requirements, the requirements are already properly described by ResourceSpecs in PhysicalTransformations.
    • This FLIP does not discuss how to set operators' resource requirements for a job.
    • Current status (including plans for Flink 1.10) of how to set operators' resource requirements for jobs can be described as follows:
      • SQL/Table API - Blink optimizer can set operator resources for the users, according to their configurations (default: unknown)
      • DataStream API -  There are no method / interface to set operator resources at the moment. It can be added in the future.
      • DataSet API - There are existing user interfaces to set operator resources.

Public Interfaces

  • ResourceSpec (See FLIP-53: Fine Grained Resource ManagementResource Requirements)
  • Introduce new configuration "taskmanager.defaultSlotResourceFraction", while deprecate but stay compatible with the configuration "taskmanager.numberOfTaskSlots". (See Unknown Resource Requirements)

Proposed Changes

General Workflow

A general workflow of the fine grained operator resource management can be described as follows. 

  • PhysicalTransformations contains ResourceSpecs, unknown (by default)
  • RestAPI / WebUI (Need to adapt the RestAPI and WebUI to the dynamic slot model.)

Proposed Changes

Operator Resource Management

  • or specified (e.g., by blink planner), that describe resource requirements of the transformation.
  • While generating job graph, StreamingJobGraphGenerator calculates fractions (of the slot managed memory) for operators and set to the StreamConfigs.
  • While scheduling, operators' ResourceSpecs are converted tasks' ResourceProfiles (ResourceSpecs of chained operators + network memory). Tasks are deployed to slots / TMs according to the ResourceProfiles.
  • While starting tasks in TMs, each operator gets the fraction of the slot managed memory, which is either original requested absolute value or a fair share for the unknown requirement. 

Operator Resource

...

Requirements

Tasks can consume the following resources of task executors (based on FLIP-49 [1]):

  • CPU cores
  • Unmanaged on-heap memory (Task Heap Memory in FLIP-49)
  • Unmanaged off-heap memory (Task Off-Heap Memory in FLIP-49)
  • Managed on-heap memory (On-Heap Managed Memory in FLIP-49)
  • Managed off-heap memory (Off-Heap Managed Memory in FLIP-49)
  • Network memory (Network Memory in FLIP-49)
  • Extended resources (GPU, FPGA, etc.)

Operators declare their resource requirements in ResourceSpec. A ResourceSpec should contain all the above resource dimensions, except for network memory, which can be derived in at runtime from the execution graph topology. Among the dimensions, CPU cores and unmanaged on-heap memory (which all operators must consume) are required as long as the operator declares a ResourceSpec, while other dimensions are optional and will be set to 0 by default if not explicitly specified.

If operators do not specify any ResourceSpec, their resource requirements are by default UNKNOWN, which will leave it to the runtime to decide how many resources they can consume.

For the first version, we do not support mixing operators with specified / unknown resource requirements in the same job. Either all or none of the operators of the same job should specify their resource requirements. StreamGraphGeneratorStreamingJobGraphGenerator should check this and throw an error when mixing of specified / unknown resource requirements is detected, during the compiling compilation stage.

Managed Memory Allocation

Fraction Based Quota

Operators should not assume any absolute quota on managed memory. Instead, a relative quota should be applied, to unify memory management for both operators with specified and unknown resource requirements.

During the compiling stage, StreamGraphGeneratorStreamingJobGraphGenerator should set two fractions for each operator.

...

fracManagedMemOnHeap = 1 / numOpsUseOnHeapManagedMemorynumOpsUseOnHeapManagedMemoryInSlotSharingGroup
fracManagedMemOffHeap = 1 / numOpsUseOffHeapManagedMemorynumOpsUseOffHeapManagedMemoryInSlotSharingGroup

    • Runtime can also expose interfaces to support setting fractions for operators with different weights.

When the task is deployed to the task executor, operators should register their fractions to the memory manager before consuming any managed memory. The registration should return the absolute quota given the relative fraction. In this way, an operator can either consume managed memory respecting to its quota and assume the memory can be guaranteed, or leave it to the memory manager to limit its memory consumption and live with the possibility that allocating new memory may not always succeed.

Over-allocation and Revocation

For the first version, we do not allow operators to consume managed memory more than their quota.

In the future, we want to allow the operators to leverage the opportunistic available managed memory in the task executor. Operators may allocate managed memory more than their quota, as long as there are enough available managed memory in the task executor and the over-allocated memory can be revoked when needed by another task whose quota is not exceeded. Release notes: Since on-heap managed memory is removed in the final implementation of FLIP-49, fracManagedMemOnHeap and its calculation are also removed from the final implementation of this FLIP.

Slot Sharing

During the compiling stage, the StreamGraphGeneratorStreamingJobGraphGenerator first identifies pipelined regions in the job graph. A pipelined region is defined as the subset of vertices connected by pipelined edges in the job graph, which should always be scheduled together. Otherwise there might be a deadlock when downstream tasks cannot be scheduled due to lack of resources, while the upstream tasks cannot finish releasing the resources because no downstream tasks read the outputs.

The StreamGraphGeneratorStreamingJobGraphGenerator sets tasks of different pipelined regions into different slot sharing groups. In this way, when the StreamGraphGeneratorStreamingJobGraphGenerator sets relative managed memory quota for operators, it will calculate the fractions only considering operators that might run at the same time. This improves resource utility utilisation for bounded batch jobs where usually not all tasks run concurrently.

For jobs written with DataStream API, there are interfaces exposed to users allowing them to explicitly set slot sharing groups for operators. In such cases, users’ settings should be respected, and StreamGraphGenerator should turn pipelined edges connecting tasks in different slot sharing groups into blocking edges to avoid deadlock risks.

It is also important to retain the good property that Flink needs as many slots as the max task parallelism to execute a job, regardless of the job graph topology. This is well retained for batch jobs, where different pipelined regions can always run sequentially, reusing the same slots. However, there are bad cases for streaming jobs when the job graph contains multiple connected components. While tasks in different connected components belong to different pipelined regions, tasks in all the connected components need to run concurrently. As a result, the minimum slots needed for executing the job becomes the sum of max task parallelisms in each connected component.

...

To solve this problem, we need to put different connected components into the same slot sharing group for streaming jobs, while keep them in different slot sharing groups to avoid having large slots with tasks not necessarily scheduled together. We need a parameter scheduleAllSourcesTogether/allVerticesInSameSlotSharingGroupByDefault indicating whether to identify all the sources as in the same pipelined region (imagine a virtual source connected to all the real sources) or not, and passed it into StreamGraphGeneratorStreamingJobGraphGenerator differently for streaming and batch jobs.

...

  • For tasks with specified resource requirements, we add up resource requirements of all the tasks in the slot sharing group, and request a slot with the sum resources.
  • For tasks with unknown resource requirements, we request a slot with default resources.

Dynamic Slot Allocation

Dynamic Slot Model

Image Removed

Currently (Flink 1.9), a task executor contains a fixed number of slots, whose resource are predefined with total task executor resource and number of slots per task executor. These slots share the same life span as the task executor does. Slots are initially free, and are assigned to and freed by job masters. 

With fine grained resource requirements, we may have slot requests with different resources. The current static slot approach may not achieve satisfying resource utility. In order to fulfill all the slot requests, we have to predefine the slots to have enough resources for all the large slot requests, which is obviously a waste for other small requests. 

We propose the dynamic slot model in this FLIP, to address the problem above. They key changes are as follows.

  • Slots in the same task executor can have different resources. Ideally, to improve overall resource utility, we should allocate to a task a subset of resources that exactly matches its resource requirements. Since individual tasks may have different resource needs, the slots should also have resources.
  • Dynamically create and destroy slots. Different jobs may need to partition the task executor’s resources into slots differently, depending on the particular resource requirements of tasks. Even for the same job, later tasks that trying to reuse resources released by previous finished tasks may prefer a different partition over the resources. Thus, we propose to partition a task executor’s resources dynamically, creating slots from available resources on demand, and destroying slots when they are released.

Image Removed

Task executors are launched with total resources but no predefined slots. When making allocation, instead of requesting an particular existing slot, the resource manager requests a slot with certain requested resources from the task executor. The task executor then create a new slot with the requested resources out of its available resources and offer the slot to the job master. As soon as the slot is released by the job master, it is destroyed and the its resources are returned back to the task executor as available resources.

Unknown Resource Requirements

Resource manager should always request slots from task executors with specified resource requirements. For slot requests with unknown resource requirements that it receives from job masters, it should allocate slots with default slot resource profiles from the task executors. 

We introduce a config option defaultSlotFraction to configure what fraction of the task executor available resource a default slot should take. For compatibility, if defaultSlotFraction is not specified, we calculate it as 1 / numOfSlot, so that by default the task executor’s resources are partitioned in the same way as in the static slot model.

Given that in standalone clusters we may have different default slot resource for different task executors, we need task executors to register their default slot resource to the resource manager on registration. The default slot resource profile should only be calculated in either startup script (standalone) or resource manager (yarn / mesos / k8s), and passed into task executors as environment variables. 

Protocol Changes

TaskExecutorGateway

Replace the requestSlot interface with a new requestResource interface.

...

requestSlot

...

requestResource

...

Parameters

...

  • SlotID
  • JobID
  • AllocationID
  • TargetAddress
  • ResourceManagerID

...

  • ResourceProfile
  • JobID
  • AllocationID
  • TargetAddress
  • ResourceManagerID

...

Return Value

...

Acknowledge

...

SlotID

...

Implementation Steps

Step 1. Introduce option allVerticesInSameSlotSharingGroupByDefault in ExecutionConfig

  • Introduce option allVerticesInSameSlotSharingGroupByDefault in ExecutionConfig
  • Set it to true by default
  • Set it to false for SQL/Table API bounded batch jobs by the Blink planner

This step should not introduce any behavior changes. 

Step 2. Set slot sharing groups according to pipelined regions

StreamingJobGraphGenerator set slot sharing group for operators at compiling time.

  • Identify pipelined regions, with respect to allVerticesInSameSlotSharingGroupByDefault
  • Set slot sharing groups according to pipelined regions 
    • By default, each pipelined region should go into a separate slot sharing group
    • If the user sets operators in multiple pipelined regions into same slot sharing group, it should be respected

This step should not introduce any behavior changes, given that later scheduled pipelined regions can reuse slots from previous scheduled pipelined regions. 

Step 3. Introduce managed memory fractions to StreamConfig

Introduce fracManagedMemOnHeap and fracManagedMemOffHeap in StreamConfig, so they can be set by StreamingJobGraphGenerator and used by operators in runtime. 

This step should not introduce any behavior changes.

Step 4. Set managed memory fractions according to slot sharing groups

  • For operators with specified ResourceSpecs, calculate fractions according to operators ResourceSpecs
  • For operators with unknown ResourceSpecs, calculate fractions according to number of operators using managed memory

This step should not introduce any behavior changes.

Step 5. Operators use fractions to decide how much managed memory to allocate

  • Operators allocate memory segments with the amount returned by MemoryManager#computeNumberOfPages.
  • Operators reserve memory with the amount returned by MemoryManager#computeMemorySize

This step activates the new fraction based managed memory.

...

A slot report that task executors send to the resource manager (in registration or heartbeats) now consists of two kinds of information.

...

Compatibility, Deprecation, and Migration Plan

  • This FLIP deprecates the configuration "taskmanager.numberOfTaskSlots", but stays should be compatible with itprevious versions.

Test Plan

  • We need to update existing and add new integration tests dedicated to validate the new fine grained resource management behaviors.
  • It is also expected that other regular integration and end-to-end tests should fail if this is broken.

Rejected Alternatives

An alternative for setting slot sharing groups in compiling is that, to set tasks with specified resource requirements into individual slot sharing groups (except for tasks in colocation groups), and tasks with unknown resource requirements in the same slot sharing group. It is rejected because it separates tasks from the same pipelined region into different slot sharing group, which may lead to a situation with resource deadlocks.

An alternative for setting relative managed memory quota for operators is to set it during the scheduling stage, with the knowledge of which tasks are actually scheduled into the same slot. It is rejected because even we make set the quota at scheduling, there is no guarantee that no tasks will be deployed into the same slot in the future, and dynamically updating the fractions requires operators’ memory usage to be revocable.

Follow Ups

Operator Managed Memory Over-allocation and Revocation

For the first version, we do not allow operators to consume managed memory more than their quota.

In the future, we want to allow the operators to leverage the opportunistic available managed memory in the task executor. Operators may allocate managed memory more than their quota, as long as there are enough available managed memory in the task executor and the over-allocated memory can be revoked when needed by another task whose quota is not exceeded. 

Reference

[1] FLIP-49: Unified Memory Configuration for TaskExecutors