Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion thread

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-53-Fine-Grained-Resource-Management-td31831.html

JIRA:

...


Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14058

Release1.10


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The current approach is easy to set up, but may not have optimal performance and resource utilityutilisation

  • Tasks may have different parallelisms, thus not all of the slots contains an entire pipeline of tasks. For slots with less fewer tasks, the slot resource predefined for an entire pipeline might be wasteful.
  • It could be hard to align slot resource with tasks requirements in all resource aspects (heap, network, managed, etc.). 

We propose fine grained resource management, which optimizes resource utility optimises resource utilisation in conditions where resource requirements of individual tasks are known or can be tuned.

...

  • The approach proposed by this FLIP should only applies apply to jobs of DataStream API and SQL/Table API by the Blink planner (unbounded streaming and bounded batch jobs). It should not affect jobs of DataSet API.
    • For DataSet jobs, there are already some fraction based approach (in TaskConfig and ChainedDriver), and we do not make any change to the existing approach. 
  • This FLIP assumes that for jobs with known operators' resource requirements, the requirements are already properly described by ResourceSpecs in PhysicalTransformations.
    • This FLIP does not discuss how to set operators' resource requirements for a job.
    • Current status (including plans for Flink 1.10) of how to set operators' resource requirements for jobs can be described as follows:
      • SQL/Table API - Blink optimizer can set operator resources for the users, according to their configurations (default: unknown)
      • DataStream API -  There are no method / interface to set operator resources at the moment. It can be added in the future.
      • DataSet API - There are existing user interfaces to set operator resources.

...

  • PhysicalTransformations contains ResourceSpecs, unknown (by default) or specified (e.g., by blink planner), that describe resource requirements of the transformation.
  • While generating job graph, StreamingJobGraphGenerator calculates fractions (of the slot managed memory) for operators and set to the StreamConfigs.
  • While scheduling, operators' ResourceSpecs are converted tasks' ResourceProfiles (ResourceSpecs of chained operators + network memory). Tasks are deployed to slots / TMs according to the ResourceProfiles.
  • While starting tasks in TMs, each operator gets the fraction of the slot managed memory, which is either original requested absolute value or a fair share for the unknown requirement. 

...

When the task is deployed to the task executor, operators should register their fractions to the memory manager before consuming any managed memory. The registration should return the absolute quota given the relative fraction. In this way, an operator can either consume managed memory respecting to its quota and assume the memory can be guaranteed, or leave it to the memory manager to limit its memory consumption and live with the possibility that allocating new memory may not always succeed.

Release notes: Since on-heap managed memory is removed in the final implementation of FLIP-49, fracManagedMemOnHeap and its calculation are also removed from the final implementation of this FLIP.

Slot Sharing

During the compiling stage, the StreamingJobGraphGenerator first identifies pipelined regions in the job graph. A pipelined region is defined as the subset of vertices connected by pipelined edges in the job graph, which should always be scheduled together. Otherwise there might be a deadlock when downstream tasks cannot be scheduled due to lack of resources, while the upstream tasks cannot finish releasing the resources because no downstream tasks read the outputs.

The StreamingJobGraphGenerator sets tasks of different pipelined regions into different slot sharing groups. In this way, when the StreamingJobGraphGenerator sets relative managed memory quota for operators, it will calculate the fractions only considering operators that might run at the same time. This improves resource utility utilisation for bounded batch jobs where usually not all tasks run concurrently.

...

To solve this problem, we need to put different connected components into the same slot sharing group for streaming jobs, while keep them in different slot sharing groups to avoid having large slots with tasks not necessarily scheduled together. We need a parameter scheduleAllSourcesTogether/allVerticesInSameSlotSharingGroupByDefault indicating whether to identify all the sources as in the same pipelined region (imagine a virtual source connected to all the real sources) or not, and passed it into StreamingJobGraphGenerator differently for streaming and batch jobs.

...

  • For tasks with specified resource requirements, we add up resource requirements of all the tasks in the slot sharing group, and request a slot with the sum resources.
  • For tasks with unknown resource requirements, we request a slot with default resources.

Implementation Steps

Step 1. Introduce

...

option allVerticesInSameSlotSharingGroupByDefault in ExecutionConfig

  • Introduce option allSourcesInSamePipelinedRegion in allVerticesInSameSlotSharingGroupByDefault in ExecutionConfig
  • Set it to true by default
  • Set it to false for SQL for SQL/Table API jobs (from blink planner)API bounded batch jobs by the Blink planner

This step should not introduce any behavior changes. 

...

  • Identify pipelined regions, with respect to allSourcesInSamePipelinedRegionto allVerticesInSameSlotSharingGroupByDefault
  • Set slot sharing groups according to pipelined regions 
    • By default, each pipelined region should go into a separate slot sharing group
    • If the user sets operators in multiple pipelined regions into same slot sharing group, it should be respected

...

This step should not introduce any behavior changes.

Step 5. Operators use fractions to decide how

...

much managed memory to allocate

  • Operators allocate memory segments with the amount returned by MemoryManager#computeNumberOfPages.
  • Operators reserve memory with the amount returned by MemoryManager#computeMemorySize

...