Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

As we see from the above steps, the whole optimization process is executed three times. The main reason is that the mini-batch and changelog inference-related rules are tightly coupled in the optimization process. For example, at each optimization iteration (stated above) the FlinkChangelogModeInferenceProgram will make some changes in the query plan, and the optimization process will [re]start again. 


I in my small prototype I ran the following tests (which are mostly non-complex queries):

  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks1
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks2
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks3
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks4
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks5
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksWithUDTF
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion1
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion2
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion3
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion4

...

These tests e2e dag optimization, including query parsing, validation, optimization, and checking the results. In these e2e optimization tests my prototype was 15-20% faster than existing Flink optimization structure with the "cost" of simplifying the codebase

Code complexity

As a result of the above specified optimization process, the code complexity increases. For example,  isUpdateBeforeRequired and getMiniBatchInterval are included is included in StreamOptimizeContext and propagated through the whole optimization process. 

...

Currently, FlinkStreamProgram contains a sequence of (potentially dependent) programs to optimize for stream table plan. The idea is to make a clear separation between query optimization, changelog inference, and mini-batch interval inference (so make them independent of each other). We will create 4 disjoing subsets of FlinkStreamProgram (these disjoint subsets do not necessarily mean different classes/files):

...

Code Block
// class StreamCommonSubGraphBasedOptimizer

override protected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = {
...

 // build RelNodeBlock plan
 val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, tableConfig)
 
 // Operate on ProgramSubsetD. Executed only once
 val optimizedSB = optimizeSinkBlocks(sinkBlocks)
 
 // Operate on ProgramSubsetA. Iterated 3 times only on the ProgramSubsetA [and not on the whole FlinkStreamProgram]
 // For this to work, we need to modify changelog inference logic to work purely on physical query plan
 val withChangelogInferenceSB = addChangelogInference(optimizedSB)
 
 // Operate on ProgramSubsetB. Executed only once
 val withMinibatchAndChangelogSB = addMiniBatchIntervalInference(withChangelogInferenceSB)

 // Operate on ProgramSubsetC. Executed only once
 val withQueryRewriteSB = finalizeWithQueryRewrite(withMinibatchAndChangelogSB)

 withQueryRewriteSB

}

As a result, we can get rid of  simplify  StreamOptimizeContext since we will not need isUpdateBeforeRequired and getMiniBatchInterval info  info to be propagated.


Compatibility, Deprecation, and Migration Plan

...