Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


As we see from the above steps, the whole optimization process is executed three times. The main reason is that the mini-batch and changelog inference-related rules are tightly coupled in the optimization process. For example, at each optimization iteration (stated above) the FlinkChangelogModeInferenceProgram will make some changes in the query plan, and the optimization process will [re]start again. 

I in my small prototype I ran the following tests:


As a result of the above specified optimization process, the code complexity increases. For example,  isUpdateBeforeRequired and getMiniBatchInterval are included is included in StreamOptimizeContext and propagated through the whole optimization process. 


Code Block
// class StreamCommonSubGraphBasedOptimizer

override protected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = {

 // build RelNodeBlock plan
 val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, tableConfig)
 // Operate on ProgramSubsetD. Executed only once
 val optimizedSB = optimizeSinkBlocks(sinkBlocks)
 // Operate on ProgramSubsetA. Iterated 3 times only on the ProgramSubsetA [and not on the whole FlinkStreamProgram]
 // For this to work, we need to modify changelog inference logic to work purely on physical query plan
 val withChangelogInferenceSB = addChangelogInference(optimizedSB)
 // Operate on ProgramSubsetB. Executed only once
 val withMinibatchAndChangelogSB = addMiniBatchIntervalInference(withChangelogInferenceSB)

 // Operate on ProgramSubsetC. Executed only once
 val withQueryRewriteSB = finalizeWithQueryRewrite(withMinibatchAndChangelogSB)



As a result, we can get rid of  simplify  StreamOptimizeContext since we will not need isUpdateBeforeRequired and getMiniBatchInterval info  info to be propagated.

Compatibility, Deprecation, and Migration Plan
