...
Page properties | ||||||||
---|---|---|---|---|---|---|---|---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
Motivation
Flink supports query plans with multiple sinks. The idea is to break the query plan into sub-trees that represent common sub-graph (RelNodeBlocks
) in query plans. Optimization process of these queries are different between Stream
and Batch
optimizers mainly because infering ChangelogMode
for every physical node in Stream workloads. There are two main issues with the current optimization process of these queries:
...
As we see from the above steps, the whole optimization process is executed three times. The main reason is that the mini-batch and changelog inference-related rules are tightly coupled in the optimization process. For example, at each optimization iteration (stated above) the FlinkChangelogModeInferenceProgram
will make some changes in the query plan, and the optimization process will [re]start again.
I in my small prototype I ran the following tests (which are mostly non-complex queries):
- org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks1
- org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks2
- org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks3
- org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks4
- org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks5
- org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksWithUDTF
- org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion1
- org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion2
- org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion3
- org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion4
These tests e2e dag optimization, including query parsing, validation, optimization, and checking the results. In these e2e optimization tests my prototype was 15-20% faster than existing Flink optimization structure with the "cost" of simplifying the codebase
Code complexity
As a result of the above specified optimization process, the code complexity increases. For example, isUpdateBeforeRequired
and getMiniBatchInterval
are included is included in StreamOptimizeContext
and propagated through the whole optimization process.
...
Currently, FlinkStreamProgram
contains a sequence of (potentially dependent) programs to optimize for stream table plan. The idea is to make a clear separation between query optimization, changelog inference, and mini-batch interval inference (so make them independent of each other). We will create 4 disjoing subsets of FlinkStreamProgram (these
disjoint subsets do not necessarily mean different classes/files):
...
Code Block |
---|
// class StreamCommonSubGraphBasedOptimizer override protected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = { ... // build RelNodeBlock plan val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, tableConfig) // Operate on ProgramSubsetD. Executed only once val optimizedSB = optimizeSinkBlocks(sinkBlocks) // Operate on ProgramSubsetA. Iterated 3 times only on the ProgramSubsetA [and not on the whole FlinkStreamProgram] // For this to work, we need to modify changelog inference logic to work purely on physical query plan val withChangelogInferenceSB = addChangelogInference(optimizedSB) // Operate on ProgramSubsetB. Executed only once val withMinibatchAndChangelogSB = addMiniBatchIntervaladdMiniBatchIntervalInference(withChangelogInferenceSB) // Operate on ProgramSubsetC. Executed only once val withQueryRewriteSB = finalizeWithQueryRewrite(withMinibatchAndChangelogSB) withQueryRewriteSB } |
As a result, we can get rid of simplify StreamOptimizeContext
since we will not need isUpdateBeforeRequired
and getMiniBatchInterval
info info to be propagated.
Compatibility, Deprecation, and Migration Plan
...