Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://lists.apache.org/thread/whblrt641kqd0mvpxf5gjmkc56hvo0kn
Vote threadTo be created
JIRA


ReleaseTo be created

Motivation

Flink supports query plans with multiple sinks. The idea is to break the query plan into sub-trees that represent common sub-graph (RelNodeBlocks) in query plans. Optimization process of these queries are different between Stream and Batch optimizers mainly because infering ChangelogMode for every physical node in Stream workloads. There are two main issues with the current optimization process of these queries:


Computationally suboptimal

The optimization process (in StreamCommonSubGraphBasedOptimizer) is triggered 3 times (also stated in Unable to render Jira issues macro, execution error. ) in order to correctly infer changelog mode for each sub-graph (RelNodeBlock)


  1. First, each block is optimized [1]
  2. Second, each block's updateKind and miniBatchInterval is propagated and then, each block is optimized again (StreamCommonSubGraphBasedOptimizer::propagateUpdateKindAndMiniBatchInterval)
  3. Third, all the intermediate results are cleared [2] and each block is optimized again [3] 


As we see from the above steps, the whole optimization process is executed three times. The main reason is that the mini-batch and changelog inference-related rules are tightly coupled in the optimization process. For example, at each optimization iteration (stated above) the FlinkChangelogModeInferenceProgram will make some changes in the query plan, and the optimization process will [re]start again. 


I in my small prototype I ran the following tests (which are mostly non-complex queries):

  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks1
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks2
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks3
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks4
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks5
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksWithUDTF
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion1
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion2
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion3
  • org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion4


These tests e2e dag optimization, including query parsing, validation, optimization, and checking the results. In these e2e optimization tests my prototype was 15-20% faster than existing Flink optimization structure with the "cost" of simplifying the codebase

Code complexity

As a result of the above specified optimization process, the code complexity increases. For example,  isUpdateBeforeRequired is included in StreamOptimizeContext and propagated through the whole optimization process. 

Also, low-level logic related to the changelog inference (see the above steps) is embedded in the high level optimization classes (e.g., StreamCommonSubGraphBasedOptimizer)


Public Interfaces

This is not expected to be a change in the programming interface, but rather a change in the Flink query optimization process. 

Proposed Changes


The proposed changes target to decouple query optimization from changelog inference and mini-batch-related rules. Concretely, we propose the following changes:


Decouple Stream Programs

Currently, FlinkStreamProgram contains a sequence of (potentially dependent) programs to optimize for stream table plan. The idea is to make a clear separation between query optimization, changelog inference, and mini-batch interval inference (so make them independent of each other). We will create 4 disjoing subsets of FlinkStreamProgram (these disjoint subsets do not necessarily mean different classes/files):

  1. ProgramSubsetA: Contains all the programs for changelog inference
  2. ProgramSubsetB: Contains all the programs related to mini-batch interval inference rules
  3. ProgramSubsetC: Contains PHYSICAL_REWRITE rule set
  4. ProgramSubsetD: Contains everything else than the above


// class StreamCommonSubGraphBasedOptimizer

override protected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = {
...

 // build RelNodeBlock plan
 val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, tableConfig)
 
 // Operate on ProgramSubsetD. Executed only once
 val optimizedSB = optimizeSinkBlocks(sinkBlocks)
 
 // Operate on ProgramSubsetA. Iterated 3 times only on the ProgramSubsetA [and not on the whole FlinkStreamProgram]
 // For this to work, we need to modify changelog inference logic to work purely on physical query plan
 val withChangelogInferenceSB = addChangelogInference(optimizedSB)
 
 // Operate on ProgramSubsetB. Executed only once
 val withMinibatchAndChangelogSB = addMiniBatchIntervalInference(withChangelogInferenceSB)

 // Operate on ProgramSubsetC. Executed only once
 val withQueryRewriteSB = finalizeWithQueryRewrite(withMinibatchAndChangelogSB)

 withQueryRewriteSB

}

As a result, we can get simplify  StreamOptimizeContext since we will not need isUpdateBeforeRequired info to be propagated.


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

As a result of this optimization, the query plan optimization time is expected to lower + the optimization logic will be simpler. 

  • If we are changing behavior how will we phase out the older behavior?

N/A

  • If we need special migration tools, describe them here.

N/A

  • When will we remove the existing behavior?

N/A

Test Plan

The target is to make all existing tests pass with the current architecture. 

Also, we will add new tests for each separated optimization phase inside StreamCommonSubGraphBasedOptimizer


Rejected Alternatives

No rejected alternatives yet. 


[1] https://github.com/apache/flink/blob/534df6490e0fc179173efabd882ff10a749d508f/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala#L90

[2] https://github.com/apache/flink/blob/534df6490e0fc179173efabd882ff10a749d508f/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala#L101

[3] https://github.com/apache/flink/blob/534df6490e0fc179173efabd882ff10a749d508f/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala#L103C2-L103C2