Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For jobs written with DataStream API, there are interfaces exposed to users allowing them to explicitly set slot sharing groups for operators. In such cases, users’ settings should be respected, and StreamGraphGenerator should turn pipelined edges connecting tasks in different slot sharing groups into blocking edges to avoid deadlock risks.

It is also important to retain the good property that Flink needs as many slots as the max task parallelism to execute a job, regardless of the job graph topology. This is well retained for batch jobs, where different pipelined regions can always run sequentially, reusing the same slots. However, there are bad cases for streaming jobs when the job graph contains multiple connected components. While tasks in different connected components belong to different pipelined regions, tasks in all the connected components need to run concurrently. As a result, the minimum slots needed for executing the job becomes the sum of max task parallelisms in each connected component.

...

  • For tasks with specified resource requirements, we add up resource requirements of all the tasks in the slot sharing group, and request a slot with the sum resources.
  • For tasks with unknown resource requirements, we request a slot with default resources.

Implementation Steps

Step 1

...

StreamGraphGenerator should make this change at compiling time.

This step introduce behavior changes, which does not affect functionality and is necessary a cost to prevent potential deadlocks.

...

. Introduce option allSourcesInSamePipelinedRegion in ExecutionConfig

  • Introduce option allSourcesInSamePipelinedRegion in ExecutionConfig
  • Set it to false true by default
  • Set it to true false for streaming jobsbatch jobs (from DataSet API and blink planner)

This step should not introduce any behavior changes. 

Step

...

2. Set slot sharing groups according to pipelined regions

StreamGraphGenerator set slot sharing group for operators at compiling time.

  • Identify pipelined regions
  • after change pipelined edges between slot sharing groups to blocking
  • with , with respect to allSourcesInSamePipelinedRegion
  • Set slot sharing groups according to pipelined regions 
    • By default, each pipelined region should go into a separate slot sharing group
    • If the user sets operators in multiple pipelined regions into same slot sharing group, it should be respected

This step should not introduce any behavior changes, given that later scheduled pipelined regions can reuse slots from previous scheduled pipelined regions. 

Step

...

3. Introduce managed memory fractions to StreamConfig

Introduce fracManagedMemOnHeap and fracManagedMemOffHeap in StreamConfig, so they can be set by StreamGraphGenerator and used by operators in runtime. 

This step should not introduce any behavior changes.

Step

...

4. Set managed memory fractions according to slot sharing groups

  • For operators with specified ResourceSpecs, calculate fractions according to operators ResourceSpecs
  • For operators with unknown ResourceSpecs, calculate fractions according to number of operators using managed memory

This step should not introduce any behavior changes.

Step

...

5. Operators use fractions to decide how many managed memory to allocate

  • Operators allocate memory segments with the amount returned by MemoryManager#computeNumberOfPages.
  • Operators reserve memory with the amount returned by MemoryManager#computeMemorySize

...