Motivation
Flink supports query plans with multiple sinks. The idea is to break the query plan into sub-trees that represent common sub-graph (RelNodeBlocks
) in query plans. Optimization process of these queries are different between Stream
and Batch
optimizers mainly because infering ChangelogMode
for every physical node in Stream workloads. There are two main issues with the current optimization process of these queries:
Computationally suboptimal
The optimization process (in StreamCommonSubGraphBasedOptimizer
) is triggered 3 times (also stated in
) in order to correctly infer changelog mode for each sub-graph (RelNodeBlock
)
- First, each block is optimized [1]
- Second, each block's
updateKind
andminiBatchInterval
is propagated and then, each block is optimized again (StreamCommonSubGraphBasedOptimizer::propagateUpdateKindAndMiniBatchInterval
) - Third, all the intermediate results are cleared [2] and each block is optimized again [3]
As we see from the above steps, the whole optimization process is executed three times. The main reason is that the mini-batch and changelog inference-related rules are tightly coupled in the optimization process. For example, at each optimization iteration (stated above) the FlinkChangelogModeInferenceProgram
will make some changes in the query plan, and the optimization process will start again.
Code complexity
As a result of the above specified optimization process, the code complexity increases. For example, isUpdateBeforeRequired
and getMiniBatchInterval
are included in StreamOptimizeContext
and propagated through the whole optimization process.
Also, low-level logic related to the changelog inference (see the above steps) is embedded in the high level optimization classes (e.g., StreamCommonSubGraphBasedOptimizer
)
Public Interfaces
This is not expected to be a change in the programming interface, but rather a change in the Flink query optimization process.
Proposed Changes
The proposed changes target to decouple query optimization from changelog inference and mini-batch-related rules. Concretely, we propose the following changes:
Decouple Stream Programs
Currently, FlinkStreamProgram
contains a sequence of programs to optimize for stream table plan. We will create 4 disjoing subsets of FlinkStreamProgram (these
disjoint subsets do not necessarily mean different classes/files):
ProgramSubsetA
: Contains all the programs for changelog inferenceProgramSubsetB
: Contains all the programs related to mini-batch interval inference rulesProgramSubsetC
: ContainsPHYSICAL_REWRITE
rule setProgramSubsetD
: Contains everything else than the above
// class StreamCommonSubGraphBasedOptimizer override protected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = { ... // build RelNodeBlock plan val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, tableConfig) // Operate on ProgramSubsetD. Executed only once val optimizedSB = optimizeSinkBlocks(sinkBlocks) // Operate on ProgramSubsetA. Iterated 3 times only on the ProgramSubsetA // For this to work, we need to modify changelog inference logic to work purely on physical query plan val withChangelogInferenceSB = addChangelogInference(optimizedSB) // Operate on ProgramSubsetB. Executed only once val withMinibatchAndChangelogSB = addMiniBatchInterval(withChangelogInferenceSB) // Operate on ProgramSubsetC. Executed only once val withQueryRewriteSB = finalizeWithQueryRewrite(withMinibatchAndChangelogSB) withQueryRewriteSB }
As a result, we can get rid of StreamOptimizeContext
since we will not need isUpdateBeforeRequired
and getMiniBatchInterval
info to be propagated.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
As a result of this optimization, the query plan optimization time is expected to lower + the optimization logic will be simpler.
- If we are changing behavior how will we phase out the older behavior?
N/A
- If we need special migration tools, describe them here.
N/A
- When will we remove the existing behavior?
N/A
Test Plan
The target is to make all existing tests pass with the current architecture.
Also, we will add new tests for each separated optimization phase inside StreamCommonSubGraphBasedOptimizer
Rejected Alternatives
No rejected alternatives yet.