Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This FLIP focuses on the operator resource management aspect of fine grained resource management.

  • We propose fraction based managed memory quota to unify the memory management for both operators with specified and unknown resource requirements.
  • We propose to narrow slot sharing groups down to pipelined regions within which tasks may run concurrently, so we can derive reasonable but not too conservative fractions for tasks / operators to share memory in the same slot.

Scope

  • The approach proposed by this FLIP should only applies to jobs of DataStream API and SQL/Table API. It should not affect jobs of DataSet API.
    • For DataSet jobs, there are already some fraction based approach (in TaskConfig and ChainedDriver), and we do not make any change to the existing approach. 
  • This FLIP assumes that for jobs with known operators' resource requirements, the requirements are already properly described by ResourceSpecs in PhysicalTransformations.
    • This FLIP does not discuss how to set operators' resource requirements for a job.
    • Current status (including plans for Flink 1.10) of how to set operators' resource requirements for jobs can be described as follows:
      • SQL/Table API - Blink optimizer can set operator resources for the users, according to their configurations (default: unknown)
      • DataStream API -  There are no method / interface to set operator resources at the moment. It can be added in the future.
      • DataSet API - There are existing user interfaces to set operator resources.

Public Interfaces

Proposed Changes

General Workflow

A general workflow of the fine grained operator resource management can be described as follows. 

  • PhysicalTransformations contains ResourceSpecs, unknown (by default) or specified (e.g., by blink planner), that describe resource requirements of the transformation.
  • While generating job graph, StreamingJobGraphGenerator calculates fractions (of the slot managed memory) for operators and set to the StreamConfigs.
  • While scheduling, operators' ResourceSpecs are converted tasks' ResourceProfiles (ResourceSpecs of chained operators + network memory). Tasks are deployed to slots / TMs according to the ResourceProfiles.
  • While starting tasks in TMs, each operator the fraction of the slot managed memory, which is either original requested absolute value or a fair share for the unknown requirement. 

Operator Resource Requirements

...

For the first version, we do not support mixing operators with specified / unknown resource requirements in the same job. Either all or none of the operators of the same job should specify their resource requirements. StreamGraphGeneratorStreamingJobGraphGenerator should check this and throw an error when mixing of specified / unknown resource requirements is detected, during the compilation stage.

...

During the compiling stage, StreamGraphGeneratorStreamingJobGraphGenerator should set two fractions for each operator.

...

fracManagedMemOnHeap = 1 / numOpsUseOnHeapManagedMemorynumOpsUseOnHeapManagedMemoryInSlotSharingGroup
fracManagedMemOffHeap = 1 / numOpsUseOffHeapManagedMemorynumOpsUseOffHeapManagedMemoryInSlotSharingGroup

    • Runtime can also expose interfaces to support setting fractions for operators with different weights.

When the task is deployed to the task executor, operators should register their fractions to the memory manager before consuming any managed memory. The registration should return the absolute quota given the relative fraction. In this way, an operator can either consume managed memory respecting to its quota and assume the memory can be guaranteed, or leave it to the memory manager to limit its memory consumption and live with the possibility that allocating new memory may not always succeed.

Over-allocation and Revocation

For the first version, we do not allow operators to consume managed memory more than their quota.

In the future, we want to allow the operators to leverage the opportunistic available managed memory in the task executor. Operators may allocate managed memory more than their quota, as long as there are enough available managed memory in the task executor and the over-allocated memory can be revoked when needed by another task whose quota is not exceeded. 

Slot Sharing

During the compiling stage, the StreamGraphGeneratorStreamingJobGraphGenerator first identifies pipelined regions in the job graph. A pipelined region is defined as the subset of vertices connected by pipelined edges in the job graph, which should always be scheduled together. Otherwise there might be a deadlock when downstream tasks cannot be scheduled due to lack of resources, while the upstream tasks cannot finish releasing the resources because no downstream tasks read the outputs.

The StreamGraphGeneratorStreamingJobGraphGenerator sets tasks of different pipelined regions into different slot sharing groups. In this way, when the StreamGraphGeneratorStreamingJobGraphGenerator sets relative managed memory quota for operators, it will calculate the fractions only considering operators that might run at the same time. This improves resource utility for batch jobs where usually not all tasks run concurrently.

...

To solve this problem, we need to put different connected components into the same slot sharing group for streaming jobs, while keep them in different slot sharing groups to avoid having large slots with tasks not necessarily scheduled together. We need a parameter scheduleAllSourcesTogether indicating whether to identify all the sources as in the same pipelined region (imagine a virtual source connected to all the real sources) or not, and passed it into StreamGraphGeneratorStreamingJobGraphGenerator differently for streaming and batch jobs.

...

  • Introduce option allSourcesInSamePipelinedRegion in ExecutionConfig
  • Set it to true by default
  • Set it to false for batch jobs SQL/Table API jobs (from DataSet API and blink planner)

This step should not introduce any behavior changes. 

Step 2. Set slot sharing groups according to pipelined regions

StreamGraphGenerator StreamingJobGraphGenerator set slot sharing group for operators at compiling time.

...

Introduce fracManagedMemOnHeap and fracManagedMemOffHeap in StreamConfig, so they can be set by StreamGraphGenerator StreamingJobGraphGenerator and used by operators in runtime. 

...

An alternative for setting relative managed memory quota for operators is to set it during the scheduling stage, with the knowledge of which tasks are actually scheduled into the same slot. It is rejected because even we make set the quota at scheduling, there is no guarantee that no tasks will be deployed into the same slot in the future, and dynamically updating the fractions requires operators’ memory usage to be revocable.

Follow Ups

Operator Managed Memory Over-allocation and Revocation

For the first version, we do not allow operators to consume managed memory more than their quota.

In the future, we want to allow the operators to leverage the opportunistic available managed memory in the task executor. Operators may allocate managed memory more than their quota, as long as there are enough available managed memory in the task executor and the over-allocated memory can be revoked when needed by another task whose quota is not exceeded. 

Reference

[1] FLIP-49: Unified Memory Configuration for TaskExecutors