Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

As we see from the above steps, the whole optimization process is executed three times. The main reason is that the mini-batch and changelog inference-related rules are tightly coupled in the optimization process. For example, at each optimization iteration (stated above) the FlinkChangelogModeInferenceProgram will make some changes in the query plan, and the optimization process will start again. 


I in my small prototype I ran the following tests:

  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks1
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks2
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks3
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks4
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks5
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksWithUDTF
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion1
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion2
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion3
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion4


These tests e2e dag optimization, including query parsing, validation, optimization, and checking the results. In these e2e optimization tests my prototype was 15-20% faster than existing Flink optimization structure. 

Code complexity

As a result of the above specified optimization process, the code complexity increases. For example,  isUpdateBeforeRequired and getMiniBatchInterval are included in StreamOptimizeContext and propagated through the whole optimization process. 

...