...
For jobs written with DataStream API, there are interfaces exposed to users allowing them to explicitly set slot sharing groups for operators. In such cases, users’ settings should be respected, and StreamGraphGenerator should turn pipelined edges connecting tasks in different slot sharing groups into blocking edges to avoid deadlock risks.
It is also important to retain the good property that Flink needs as many slots as the max task parallelism to execute a job, regardless of the job graph topology. This is well retained for batch jobs, where different pipelined regions can always run sequentially, reusing the same slots. However, there are bad cases for streaming jobs when the job graph contains multiple connected components. While tasks in different connected components belong to different pipelined regions, tasks in all the connected components need to run concurrently. As a result, the minimum slots needed for executing the job becomes the sum of max task parallelisms in each connected component.
...
- For tasks with specified resource requirements, we add up resource requirements of all the tasks in the slot sharing group, and request a slot with the sum resources.
- For tasks with unknown resource requirements, we request a slot with default resources.
Implementation Steps
Step 1
...
StreamGraphGenerator
should make this change at compiling time.
This step introduce behavior changes, which does not affect functionality and is necessary a cost to prevent potential deadlocks.
...
. Introduce option allSourcesInSamePipelinedRegion in ExecutionConfig
- Introduce option
allSourcesInSamePipelinedRegion
inExecutionConfig
- Set it to
false
true
by default - Set it to
true
false
for streaming jobsbatch jobs (from DataSet API and blink planner)
This step should not introduce any behavior changes.
Step
...
2. Set slot sharing groups according to pipelined regions
StreamGraphGenerator
set slot sharing group for operators at compiling time.
- Identify pipelined regions
- after change pipelined edges between slot sharing groups to blocking with , with respect to
- Set slot sharing groups according to pipelined regions
- By default, each pipelined region should go into a separate slot sharing group
- If the user sets operators in multiple pipelined regions into same slot sharing group, it should be respected
allSourcesInSamePipelinedRegion
This step should not introduce any behavior changes, given that later scheduled pipelined regions can reuse slots from previous scheduled pipelined regions.
Step
...
3. Introduce managed memory fractions to StreamConfig
Introduce fracManagedMemOnHeap
and fracManagedMemOffHeap
in StreamConfig
, so they can be set by StreamGraphGenerator
and used by operators in runtime.
This step should not introduce any behavior changes.
Step
...
4. Set managed memory fractions according to slot sharing groups
- For operators with specified
ResourceSpecs
, calculate fractions according to operatorsResourceSpecs
- For operators with unknown
ResourceSpecs
, calculate fractions according to number of operators using managed memory
This step should not introduce any behavior changes.
Step
...
5. Operators use fractions to decide how many managed memory to allocate
- Operators allocate memory segments with the amount returned by
MemoryManager#computeNumberOfPages
. - Operators reserve memory with the amount returned by
MemoryManager#computeMemorySize
.
...