...
As we see from the above steps, the whole optimization process is executed three times. The main reason is that the mini-batch and changelog inference-related rules are tightly coupled in the optimization process. For example, at each optimization iteration (stated above) the FlinkChangelogModeInferenceProgram
will make some changes in the query plan, and the optimization process will [re]start again.
I in my small prototype I ran the following tests:
...
As a result of the above specified optimization process, the code complexity increases. For example, isUpdateBeforeRequired
and getMiniBatchInterval
are included is included in StreamOptimizeContext
and propagated through the whole optimization process.
...
Code Block |
---|
// class StreamCommonSubGraphBasedOptimizer
override protected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = {
...
// build RelNodeBlock plan
val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, tableConfig)
// Operate on ProgramSubsetD. Executed only once
val optimizedSB = optimizeSinkBlocks(sinkBlocks)
// Operate on ProgramSubsetA. Iterated 3 times only on the ProgramSubsetA [and not on the whole FlinkStreamProgram]
// For this to work, we need to modify changelog inference logic to work purely on physical query plan
val withChangelogInferenceSB = addChangelogInference(optimizedSB)
// Operate on ProgramSubsetB. Executed only once
val withMinibatchAndChangelogSB = addMiniBatchIntervalInference(withChangelogInferenceSB)
// Operate on ProgramSubsetC. Executed only once
val withQueryRewriteSB = finalizeWithQueryRewrite(withMinibatchAndChangelogSB)
withQueryRewriteSB
} |
As a result, we can get rid of simplify StreamOptimizeContext
since we will not need isUpdateBeforeRequired
and getMiniBatchInterval
info info to be propagated.
Compatibility, Deprecation, and Migration Plan
...