...
As we see from the above steps, the whole optimization process is executed three times. The main reason is that the mini-batch and changelog inference-related rules are tightly coupled in the optimization process. For example, at each optimization iteration (stated above) the FlinkChangelogModeInferenceProgram
will make some changes in the query plan, and the optimization process will start again.
I in my small prototype I ran the following tests:
- org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks1
- org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks2
- org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks3
- org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks4
- org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks5
- org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksWithUDTF
- org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion1
- org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion2
- org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion3
- org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion4
These tests e2e dag optimization, including query parsing, validation, optimization, and checking the results. In these e2e optimization tests my prototype was 15-20% faster than existing Flink optimization structure.
Code complexity
As a result of the above specified optimization process, the code complexity increases. For example, isUpdateBeforeRequired
and getMiniBatchInterval
are included in StreamOptimizeContext
and propagated through the whole optimization process.
...