Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Hence we propose

  • A new explain mode PLAN_ADVICE to the current ExplainDetails. EXPLAIN PLAN_ADVICE <query> will provide users with instant and actionable SQL advice per query, which is absent in the current explain mechanism.
  • A new enum ExplainFormat to better categorize the output format of the explained result. The current EXPLAIN result presents users with a tree-style plain text format, which is only human-readable. By categorizing the format, it is flexible to extend to a more structured format (like JSON) in the future to perform visualization or other analysis.

...

  • Users can get instant advice when explaining the query with the proposed syntax EXPLAIN PLAN_ADVICE.Users can further develop custom analyzers based on the proposed API
  • This FLIP lays the groundwork for further improvement. For example, the community might implement more analyzers or provide the visualization of the physical plan with advice (by Flink Plan Visualizer) in the future.

Public Interfaces

We propose to add/change the following interfaces/classes.

Change ListModuleAnnotation
Explainableflink-table-api-javaPublicEvolving
ExplainFormat
ExplainDetail
TableEnvironment
PlanAnalyzerflink-table-plannerInternal
PlanAdvice

Table API

Code Block
languagejava
firstline1
titleExplainable
linenumberstrue
/**
 * Represents an artifact that can be explained using a summary string.
 *
 * @see #explain(ExplainDetail...)
 */
@PublicEvolving
public interface Explainable<SELF extends Explainable<SELF>> {

    /**
     * Returns the AST of this object and the execution plan to compute the result of the given
     * statement.
     *
     * @param extraDetails The extra explain details which the result of this method should include,
     *     e.g. estimated cost, changelog mode for streaming
     * @return AST and the execution plan.
     */
    default String explain(ExplainDetail... extraDetails) {
        return explain(ExplainFormat.TEXT, extraDetails);
    }

    /**
     * Returns the AST of this object and the execution plan to compute the result of the given
     * statement.
     *
     * @param format The output format of explain plan
     * @param extraDetails The extra explain details which the result of this method should include,
     *     e.g. estimated cost, changelog mode for streaming
     * @return AST and the execution plan.
     */
    String explain(ExplainFormat format, ExplainDetail... extraDetails);

    /** Like {@link #explain(ExplainDetail...)}, but piping the result to {@link System#out}. */
    @SuppressWarnings("unchecked")
    default SELF printExplain(ExplainDetail... extraDetails) {
        return printExplain(ExplainFormat.TEXT, extraDetails);
    }

    /** Like {@link #explain(ExplainDetail...)}, but piping the result to {@link System#out}. */
    @SuppressWarnings("unchecked")
    default SELF printExplain(ExplainFormat format, ExplainDetail... extraDetails) {
        System.out.println(explain(format, extraDetails));
        return (SELF) this;
    }
}

...

Table Planner

Introduce an interface PlanAnalyzer to analyze the optimized rel nodes generated by Planner. It encloses an inner interface, i.e. AnalyzedResult, to wrap a PlanAdvice with a list of affected rel node IDs.

Code Block
languagejava
firstline1
titlePlanAnalyzer
linenumberstrue
/**
 * Plan analyzer analyzes the optimized physical plan and gives feedback on potential risk warnings
 * and optimization advice.
 */
@Internal
public interface PlanAnalyzer {
    /** Analyze the optimized {@link RelNode} and return {@link AnalyzedResult}. */
    Optional<AnalyzedResult> analyze(FlinkRelNode rel);

    /** The analyzed {@link PlanAdvice} with a list of applicable {@link RelNode} ids. */
    interface AnalyzedResult {

        PlanAdvice getAdvice();

        List<Integer> getTargetIds();
    }
}


Introduce a POJO classPlanAdvicewith the enumKindandScopeto encapsulate the advice given byPlanAnalyzer.

Code Block
languagejava
firstline1
titlePlanAdvice
linenumberstrue
package org.apache.flink.table.planner.analyze;

import org.apache.flink.annotation.Experimental;

/** Plain POJO for advice provided by {@link PlanAnalyzer}. */
@Internal
public final class PlanAdvice {

    private final Kind kind;

    private final Scope scope;

    private String content;

    public PlanAdvice(Kind kind, Scope scope, String content) {
        this.kind = kind;
        this.scope = scope;
        this.content = content;
    }

    public PlanAdvice(Kind kind, Scope scope) {
        this(kind, scope, "");
    }

    public PlanAdvice withContent(String content) {
        this.content = content;
        return this;
    }

    public String getContent() {// getters
    ...

    /** Categorize the semantics  return content;of a {@link PlanAdvice}. */
    }@Internal

    public Scopeenum getScope()Kind {
        return scope;
/** Indicate the potential risk. */
        }

WARNING,
        /** Indicate publicthe Kind getKind() {potential optimization. */
        return kind;ADVICE
    }

    /** Categorize the semanticsscope of a {@link PlanAdvice}. */
    @Internal
    public enum KindScope {
        /** Indicate the potential risk. */
        WARNING,
        /** Indicate the advice potentialis optimization. */
   not specific to a {@link
     ADVICE
    }
* org.apache.calcite.rel.RelNode}.
    /** Categorize the scope of a {@link PlanAdvice}. */
    @Internal
    public enum Scope {QUERY_LEVEL,
        /**
         * Indicate athe global advice, which is not specific to a {@link
         * org.apache.calcite.rel.RelNode}.
         */
        GLOBAL,NODE_LEVEL
        /**
         * Indicate a local advice, which could be located to a specific {@link
         * org.apache.calcite.rel.RelNode}.
         */
        LOCAL
    }
}

Proposed Changes

Overall Design

}
}


Proposed Changes

Overall Design

To leverage the planner's power to To leverage the planner's power to give more understandable, actionable advice closer to the user's perspective, we propose a new explain mode PLAN_ADVICE, which analyzes the optimized physical plan and attaches available tuning advice or data correctness warnings to the output. We also propose to categorize the output format by introducing the enum ExplainFormat, and with ExplainFormat#TEXT as default, which corresponds to the current output format. 

The syntax exposed to users will be

Code Block
languagesql
firstline1
titleUser SQL Syntax
linenumberstrue
EXPLAIN [([CHANGELOG_MODE PLAN_ADVICE <query>| ESTIMATED_COST | PLAN_A
DVICE | JSON_EXECUTION_PLAN]) | PLAN FOR] <query_statement_or_insert_statement_or_statement_set>


When users execute EXPLAIN statement, the SQL is first parsed as an ExplainOperation and sent to TableEnvironment#explainInternal with the specified details and format.

Then Planner#explain will be invoked to generate the optimized physical plan. The PlanAnalyzer will loop through the optimized physical plan (i.e. relNodes) to perform analysis.

If the physical rel's pattern matches the analyzer, a piece of PlanAdvice will be tagged to the physical plan. There might be a sequence of advice provided by different analyzers targeting the same query. 

SCOPE and KIND are two orthogonal enums to categorize a PlanAdvice.

KINDNOTE
WARNINGIt reveals potential data correctness risks, such as state expiration, and NDU issues
ADVICEIt suggests potential SQL optimizer tuning configuration, such as enabling mini-batch to optimize to two_phase aggregation


SCOPENOTE
GLOBAL
QUERY_LEVELIt provides advice from a global view, targeting the entire query
LOCAL
NODE_LEVELIt
can link
provides advice to
the
a specific rel node
, such as a table scan or a rank. It will be appended to the target rel node.

For SCOPE as LOCAL, the returned result will be similar to CHANGELOG_MODE and ESTIMATED_COSTS, which appends the info to the target rel node. For SCOPE AS GLOBAL,  "Physical Plan Advice" will be appended after "Optimized Physical Plan", followed by the global advice. 

Use Case

Case-1

SCOPE = LOCAL and KIND = WARNING

Suppose we implement an analyzer to collect all the rel nodes sensitive to state TTL. Then for the following query, the result will attach a warning on the join and aggregate node.

.


The current explain result is illustrated in the SQL Explain Result. When ExplainDetails contain PLAN_ADVICE, the output format will be like


Condition 1. The optmized physical plan pattern matches the PlanAnalyzers. The analyzers generate some advice with SCOPE#NODE_LEVEL, then the RelNode's attributes will contain a new entry named "advice" with id. Then the advice content will be appended after the plan.

set 'table.exec.state.ttl' = '36000';   create table `order` ( order_id bigint not null primary key not enforced, gmt_create timestamp(3) not null, buyer_id bigint not null, category_id bigint not null, amount double not null, ptime as proctime() ) with ( 'connector' = 'values', 'bounded' = 'false' );   create table `category` ( category_id bigint not null primary key not enforced, category_name string not null ) with ( 'connector' = 'values', 'bounded' = 'false' ); explain plan_advice select b.category_name, sum(a.amount) as revenue from `order` a left join `category` for system_time as of `a`.`ptime` as `b` on a.category_id = b.category_id group by b.category_name;
Code Block
languagetext
Code Block
languagesql
firstline1
titleQuery ExampleOutputFormat with NODE_LEVEL advice
linenumberstrue
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(count=[$0], word=[$1])
:  +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
:     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+- LogicalProject(count=[$0], word=[$1])
   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])

== Optimized Physical Plan With Advice ==
Union(advice=[1], all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(advice=[2], table=[[default_catalog, default_database, MyTable2]], fields=[count, word])

advice[1]: You might want to pay attention to this node because ...
advice[2]: You might want to pay attention to this node because ...

== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])


Conditition 2. The optmized physical plan pattern matches the PlanAnalyzers. The analyzers generate some advice with SCOPE#QUERY_LEVEL, then the advice content will be appended after the plan.

Code Block
languagetext
firstline1
titleOutput FormatOutputFormat with QUERY_LEVEL advice
linenumberstrue
== Abstract Syntax Tree ==
LogicalAggregateLogicalUnion(groupall=[{0}], revenue=[SUM($1)true])
+:- LogicalProject(category_namecount=[$7$0], amountword=[$4$1])
:   +- LogicalCorrelateLogicalFilter(correlationcondition=[$cor0], joinType=[left], requiredColumns=[{3, 5}])
LIKE($1, _UTF-16LE'F%')])
:      :+- LogicalProjectLogicalTableScan(order_idtable=[$0][default_catalog, gmt_create=[$1], buyer_id=[$2], category_id=[$3], amount=[$4], ptime=[PROCTIME()default_database, MyTable1]])
+- LogicalProject(count=[$0], word=[$1])
      :  +- LogicalTableScan+- LogicalTableScan(table=[[default_catalog, default_database, orderMyTable2]])

== Optimized Physical Plan With  +- LogicalFilter(condition=[=($cor0.category_id, $0)])
         +- LogicalSnapshot(period=[$cor0.ptime])
            +- LogicalTableScanAdvice ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, categoryMyTable2]])

== Optimized Physical Plan ==
GroupAggregate(groupBy=[category_name], selectfields=[category_namecount, SUM(amount) AS revenue], warning=[word])

advice[1]: You might want to payconfigure attention'xxx' to this node because state expiration configuration 'table.exec.state.ttl' is enabled.])
+- Exchange(distribution=[hash[category_name]])
   +improve ....
advice[2]: You might want to configure 'yyy' to improve ....

== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[category_namecount, amountword])
      +- LookupJoin(table=[default_catalog.default_database.category], joinType=[LeftOuterJoin], lookup=[category_id=category_id], select=[category_id, amount, category_id0, category_name], warning=[You might want to pay attention to this node because state expiration configuration 'table.exec.state.ttl' is enabled.])
         +- TableSourceScan(table=[[default_catalog, default_database, order, project=[category_id, amount], metadata=[]]], fields=[category_id, amount])

== Optimized Execution Plan ==
GroupAggregate(groupBy=[category_name], select=[category_name, SUM(amount) AS revenue])
+- Exchange(distribution=[hash[category_name]])
   +- Calc(select=[category_name, amount])
      +- LookupJoin(table=[default_catalog.default_database.category], joinType=[LeftOuterJoin], lookup=[category_id=category_id], select=[category_id, amount, category_id0, category_name])
         +- TableSourceScan(, where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])


Condition 3. The optmized physical plan pattern matches the PlanAnalyzers. The analyzers generate some advice with both SCOPE#QUERY_LEVEL and SCOPE#NODE_LEVEL, then NODE_LEVEL advice will be tagged to the target relNode's attributes, followed by QUERY_LEVEL advice.

Code Block
languagetext
firstline1
titleOutputFormat with QUERY_LEVEL & NODE_LEVEL advice
linenumberstrue
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(count=[$0], word=[$1])
:  +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
:     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+- LogicalProject(count=[$0], word=[$1])
   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])

== Optimized Physical Plan With Advice ==
Union(advice=[1], all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(advice=[2], table=[[default_catalog, default_database, orderMyTable1]], projectfields=[category_idcount, amount], metadata=[]]]word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[category_idcount, amount])

Case-2

SCOPE = GLOBAL and KIND = WARNING

Suppose we implement an analyzer to detect the NDU issues. Then for the following query, the result will attach a piece of global advice after the "Optimized Physical Plan".

Code Block
languagesql
firstline1
titleQuery Example
linenumberstrue
create temporary table cdc_with_meta (
 a int,
 b bigint,
 c string,
 d boolean,
 metadata_1 int metadata,
 metadata_2 string metadata,
 metadata_3 bigint metadata,
 primary key (a) not enforced
) with (
 'connector' = 'values',
 'changelog-mode' = 'I,UA,UB,D',
 'readable-metadata' = 'metadata_1:INT, metadata_2:STRING, metadata_3:BIGINT'
);

create temporary table sink_without_pk (
 a int,
 b bigint,
 c string
) with (
 'connector' = 'values',
 'sink-insert-only' = 'false'
);

explain plan_advice 
insert into sink_without_pk
select a, metadata_3, c
from cdc_with_meta
word])

advice[1]: You might want to pay attention to this node because ...
advice[2]: You might want to pay attention to this node because ...
adivce[3]: You might want to configure 'yyy' to improve ....

== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])


Conditition 4. The optmized physical plan pattern does not match the PlanAnalyzers. No advice is provided.

Code Block
languagetext
firstline1
titleOutputFormat with no advice
linenumberstrue
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(count=[$0], word=[$1])
:  +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
:     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+- LogicalProject(count=[$0], word=[$1])
   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])

== Optimized Physical Plan With Advice ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan
Code Block
languagetext
firstline1
titleOutput Format
linenumberstrue
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, metadata_3, c])
+- LogicalProject(a=[$0], metadata_3=[$6], c=[$2])
   +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_metaMyTable1]], fields=[count, word])

== Optimized Physical Plan ==
Sink+- TableSourceScan(table=[[default_catalog., default_database.sink_without_pk, MyTable2]], fields=[acount, metadata_3, cword])
+- Calc(select=[a, metadata_3, c])
   +- TableSourceScan[3]
advice: No available advice.

== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_metaMyTable1]], projectfields=[acount, c], metadata=[metadata_3]word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[acount, c, metadata_3])

== Physical Plan Advice ==
[ADVICE] The metadata column(s): 'metadata_3' in cdc source may cause wrong result or error on downstream operators, please consider removing these columns or use a non-cdc source that only has insert messages.
source node:
TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta, project=[a, c], metadata=[metadata_3]]], fields=[a, c, metadata_3], changelogMode=[I,UB,UA,D], upsertKeys=[[a]])


== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, metadata_3, c])
+- Calc(select=[a, metadata_3, c])
   +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta, project=[a, c], metadata=[metadata_3]]], fields=[a, c, metadata_3])

It would be an interactive experience for users to adjust the stream queries to be "correct" and "fast".

Image Removed

PlanAnalzyer

Here is an example implementation of PlanAnalyzer to detect all state TTL-sensitive rel nodes and inform users of the risk. Introducing a new PlanAnalyzer is much similar to adding a new RelOptRule to RuleSet.

word])


Use Case

Case-1

SCOPE = NODE_LEVEL and KIND = WARNING

Suppose we implement an analyzer to collect all the rel nodes sensitive to state TTL. Then for the following query, the result will attach a warning on the join and aggregate node.

Code Block
languagesql
firstline1
titleCase-1 Query Example
linenumberstrue
set 'table.exec.state.ttl' = '36000';  

create table `order` (
  order_id bigint not null primary key not enforced,
  gmt_create timestamp(3) not null,
  buyer_id bigint not null,
  category_id bigint not null,
  amount double not null,
  ptime as proctime()
) with (
  'connector' = 'values',
  'bounded' = 'false'
);  

create table `category` (
  category_id bigint not null primary key not enforced,
  category_name string not null
) with (
  'connector' = 'values',
  'bounded' = 'false'
);

explain plan_advice
select
  b.category_name,
  sum(a.amount) as revenue
from
  `order` a left join `category` for system_time as of `a`.`ptime` as `b`
on a.category_id = b.category_id
group by b.category_name;


Code Block
languagetext
firstline1
titleCase-1 Output Format
linenumberstrue
== Abstract Syntax Tree ==
LogicalAggregate(group=[{0}], revenue=[SUM($1)])
+- LogicalProject(category_name=[$7], amount=[$4])
   +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{3, 5}])
      :- LogicalProject(order_id=[$0], gmt_create=[$1], buyer_id=[$2], category_id=[$3], amount=[$4], ptime=[PROCTIME()])
      :  +- LogicalTableScan(table=[[default_catalog, default_database, order]])
      +- LogicalFilter(condition=[=($cor0.category_id, $0)])
Code Block
languagejava
firstline1
titleStateExpireRiskAnalyzer
linenumberstrue
/** An implementation of {@link PlanAnalyzer} to collect state TTL sensitive rels. */ 
@Experimental
public class StateExpireRiskAnalyzer implements PlanAnalyzer {

    public static final StateExpireRiskAnalyzer INSTANCE = new StateExpireRiskAnalyzer();

    private static final PlanAdvice STATE_EXPIRE =
            new PlanAdvice(
            +- LogicalSnapshot(period=[$cor0.ptime])
       PlanAdvice.Kind.WARNING,
     +- LogicalTableScan(table=[[default_catalog, default_database, category]])

== Optimized Physical Plan With Advice ==
GroupAggregate(advice=[1], groupBy=[category_name], select=[category_name, SUM(amount)   PlanAdvice.Scope.LOCAL,
 AS revenue])
+- Exchange(distribution=[hash[category_name]])
   +- Calc(select=[category_name, amount])
      +- LookupJoin(advice=[1], table=[default_catalog.default_database.category], joinType=[LeftOuterJoin], lookup=[category_id=category_id], select=[category_id,   String.format(amount, category_id0, category_name])
         +- TableSourceScan(table=[[default_catalog, default_database, order, project=[category_id, amount], metadata=[]]],             "fields=[category_id, amount])

advice[1]: You might want to pay attention to this node because state expiration configuration '%stable.exec.state.ttl' is enabled.",

== Optimized Execution Plan ==
GroupAggregate(groupBy=[category_name], select=[category_name, SUM(amount) AS revenue])
+- Exchange(distribution=[hash[category_name]])
   +- Calc(select=[category_name, amount])
              ExecutionConfigOptions.IDLE_STATE_RETENTION.key()));

    private static final Set<String> STATE_TTL_INSENSITIVE_REL = new HashSet<>();
+- LookupJoin(table=[default_catalog.default_database.category], joinType=[LeftOuterJoin], lookup=[category_id=category_id], select=[category_id, amount, category_id0, category_name])
    static {
    +- TableSourceScan(table=[[default_catalog, default_database, order, project=[category_id, amount], metadata=[]]], fields=[category_id, amount])


Case-2

SCOPE = QUERY_LEVEL and KIND = WARNING

Suppose we implement an analyzer to detect the NDU issues. Then for the following query, the result will attach a piece of global advice after the "Optimized Physical Plan".

Code Block
languagesql
firstline1
titleCase-2 Query Example
linenumberstrue
create temporary table cdc_with_meta (
 a int,
 b bigint,
 c string,
 d boolean,
 metadata_1 int metadata,
 metadata_2 string metadata,
 metadata_3 bigint metadata,
 primary key (a) not enforced
) with (
 'connector' = 'values',
 'changelog-mode' = 'I,UA,UB,D',
 'readable-metadata' = 'metadata_1:INT, metadata_2:STRING, metadata_3:BIGINT'
);

create temporary table sink_without_pk (
 a int,
 b bigint,
 c string
) with (
 'connector' = 'values',
 'sink-insert-only' = 'false'
);

explain plan_advice 
insert into sink_without_pk
select a, metadata_3, c
from cdc_with_meta


Code Block
languagetext
firstline1
titleCase-2 Output Format
linenumberstrue
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, metadata_3, c])
+- LogicalProject(a=[$0], metadata_3=[$6], c=[$2])
   +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])

== Optimized Physical Plan With Advice ==
Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, metadata_3, c])
+- Calc(select=[a, metadata_3, c])
   +- TableSourceScan[3](table=[[default_catalog, default_database, cdc_with_meta, project=[a, c], metadata=[metadata_3]]], fields=[a, c, metadata_3])

advice[1]: The metadata column(s): 'metadata_3' in cdc source may cause wrong result or error on downstream operators, please consider removing these columns or use a non-cdc source that only has insert messages.
source node:
TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta, project=[a, c], metadata=[metadata_3]]], fields=[a, c, metadata_3], changelogMode=[I,UB,UA,D], upsertKeys=[[a]])


== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, metadata_3, c])
+- Calc(select=[a, metadata_3, c])
   +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta, project=[a, c], metadata=[metadata_3]]], fields=[a, c, metadata_3])


It would be an interactive experience for users to adjust the stream queries to be "correct" and "fast".

Image Added

PlanAnalzyer

PlanAnalyzer is an internal interface to perform the analysis and provide advice.  Introducing a new PlanAnalyzer is much similar to adding a new RelOptRule to RuleSet.

Here is an example implementation of PlanAnalyzer to detect all state TTL-sensitive rel nodes and inform users of the risk.

Code Block
languagejava
firstline1
titleStateExpireRiskAnalyzer
linenumberstrue
/** An implementation of {@link PlanAnalyzer} to collect state TTL sensitive rels. */ 
@Experimental
public class StateExpireRiskAnalyzer implements PlanAnalyzer {

    public static final StateExpireRiskAnalyzer INSTANCE = new StateExpireRiskAnalyzer();

    private static final PlanAdvice STATE_EXPIRE =
            new PlanAdvice(
                    PlanAdvice.Kind.WARNING,
                    PlanAdvice.Scope.NODE_LEVEL,
                    String.format(
                            "You might want to pay attention to this node because state expiration configuration '%s' is enabled.",
                            ExecutionConfigOptions.IDLE_STATE_RETENTION.key()));

    private static final Set<String> STATE_TTL_INSENSITIVE_REL = new HashSet<>();

    static {
        // excludes the state TTL insensitive rel
    }

    private StateExpireRiskAnalyzer() {}

    @Override
    public Optional<AnalyzedResult> analyze(FlinkRelNode rel) {
        List<Integer> targetRelIds = new ArrayList<>();
        boolean enableStateTTL =
                ShortcutUtils.unwrapTableConfig(rel)
                                .get(ExecutionConfigOptions.IDLE_STATE_RETENTION)
                                .toMillis()
                        > 0;
        if (rel instanceof FlinkPhysicalRel && enableStateTTL) {
            rel.accept(
                    new RelShuttleImpl() {
                        @Override
                        public RelNode visit(RelNode other) {
                             // excludes the state TTL insensitive rel
    }

    private StateExpireRiskAnalyzer() {}

    @Override
    public Optional<AnalyzedResult> analyze(FlinkRelNode rel) {
        List<Integer> targetRelIds = new ArrayList<>();
        boolean enableStateTTL =
                ShortcutUtils.unwrapTableConfig(rel)
                                .get(ExecutionConfigOptions.IDLE_STATE_RETENTION)
                                .toMillis()
                        > 0;
        if (rel instanceof FlinkPhysicalRel && enableStateTTL) {
            rel.accept(
                    new RelShuttleImpl() {
                        @Override
                        public RelNode visit(RelNode other) {
                            if (!STATE_TTL_INSENSITIVE_REL.contains(
                                    other.getClass().getCanonicalName())) {
                                targetRelIds.add(other.getId());
                            }
                            return super.visit(other);
                        }
                    });
            if (!targetRelIds.isEmpty()) {
                return Optional.of(
                        new AnalyzedResult() {
                            @Override
                            public PlanAdvice getAdvice() {
                                return STATE_EXPIRE;
                            }

                            @Override
                            public List<Integer> getTargetIds() {
                                return targetRelIds;
                            }
                        });
            }
        }
        return Optional.empty();
    }
}    });
            }
        }
        return Optional.empty();
    }
}

ExplainFormat

The problem of the current output format is that it is a mixture of plain text (AST, Optimized Physical Plan and Optimized Execution Plan) and JSON (Physical Execution Plan,  via EXPLAIN JSON_EXECUTION_PLAN ), which is not structured and categorized.

By introducing ExplainFormat, we can better categorize the explain format. ExplainFormat#TEXT,  which corresponds to the current output format, will be the default format to achieve back-wards compatibility.

In the future, we might introduce more formats along with the syntax EXPLAIN [ExplainDetails...] WITH [TEXT | JSON ] FORMAT <query>. While this is beyond the scope of this FLIP.

Open Questions

Q: What is the relationship between PLAN_ADVICE, CHANGELOG_MODE, and ESITMATED_COSTS? Can I explain all of those together? What will the result look like?

A: PLAN_ADVICE is another kind of explain detail towards CHANGELOG_MODE and ESTIMATED_COSTS. Users can choose to explain multiple details at the same time.

...

On first thought, we want to align the syntax with EXPLAIN JSON_EXECUTION_PLAN. However, it is unclear to know which plan needs to be analyzed. This brings out the second question, which plan should be analyzed? 

Analyze JSON Execution Plan v.s. Optimized Physical Plan

Q: Why not enhance ExplainDetail#JSONExplainDetail#JSON_EXECUTION_PLAN, but choose the target to be optimized rel?

A: Currently, we have three ExplainDetail: CHANGLOG_MODE, ESTIMATED_COSTS and JSON_EXECUTION_PLAN.

The relationship between them and the corresponding SQL-processing phase is illustrated below. Stream Graph is first excluded because we want to limit this analysis within the compile time. And from this Fig, we can tell that optimized rel is the closest to the original query.

...