Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: update the release version

Status

Current state: "Under Discussion" Accepted

Discussion thread: https://lists.apache.org/thread/5xywxv7g43byoh0jbx1b6qo6gx6wjkcz

Vote thread: https://lists.apache.org/thread/bsgqvvs9wx1dkv7p3m9ctockh84rl11j

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-30650
JIRA: TBD

Released: <Flink Version>1.17.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Although SQL has been widely adopted among Flink users in recent years, we noticed that it is still not easy for users to write the "correct" and "fast" stream queries to achieve what they want, and the general familiarity with the concept of the dynamic table and the retraction might not be enough to troubleshoot the specific cases, even for experienced users. We might have posted detailed documentation, such as data correctness issues caused by NDU and performance tuning. Nevertheless, users must match the issue pattern with the documented item to have a fix/workaround. For users, this process seems like a repetitive pull-based guess-and-try. There is a considerable gap between the documented concepts/optimization techniques and the actual, applicable actions. 

...

Hence we propose

  • A new explain mode PLAN_ADVICE to the current ExplainDetails. EXPLAIN PLAN_ADVICE <query> will provide users with instant and actionable SQL advice per query, which is absent in the current explain mechanism.
  • A new enum ExplainFormat to better categorize the output format of the explained result. The current EXPLAIN result presents users with a tree-style plain text format, which is only human-readable. By categorizing classifying the format, it is flexible to extend to a more structured format (like JSON) in the future to perform visualization or other analysis.

...

  • Users can get instant advice when explaining the query with the proposed syntax EXPLAIN PLAN_ADVICE.
  • Users can further develop custom analyzers based on the proposed API.

Public Interfaces

  • This FLIP lays the groundwork for further improvement. For example, the community might implement more analyzers or provide the visualization of the physical plan with advice (by Flink Plan Visualizer) in the future.

Public Interfaces

We We propose to add/change the following interfaces/classes.

Change ListModuleAnnotation
Explainableflink-table-api-javaPublicEvolving
ExplainFormat
ExplainDetail
TableEnvironment
PlanAnalyzerflink-table-plannerInternal
PlanAdvice

Table API

Code Block
languagejava
firstline1
titleExplainable
linenumberstrue
/**
 * Represents an artifact that can be explained using a summary string.
 *
 * @see #explain(ExplainDetail...)
 */
@PublicEvolving
public interface Explainable<SELF extends Explainable<SELF>> {

    /**
     * Returns the AST of this object and the execution plan to compute the result of the given
     * statement.
     *
     * @param extraDetails The extra explain details which the result of this method should include,
     *     e.g. estimated cost, changelog mode for streaming
     * @return AST and the execution plan.
     */
    default String explain(ExplainDetail... extraDetails) {
        return explain(ExplainFormat.TEXT, extraDetails);
    }

    /**
     * Returns the AST of this object and the execution plan to compute the result of the given
     * statement.
     *
     * @param format The output format of explain plan
     * @param extraDetails The extra explain details which the result of this method should include,
     *     e.g. estimated cost, changelog mode for streaming
     * @return AST and the execution plan.
     */
    String explain(ExplainFormat format, ExplainDetail... extraDetails);

    /** Like {@link #explain(ExplainDetail...)}, but piping the result to {@link System#out}. */
    @SuppressWarnings("unchecked")
    default SELF printExplain(ExplainDetail... extraDetails) {
        return printExplain(ExplainFormat.TEXT, extraDetails);
    }

    /** Like {@link #explain(ExplainDetail...)}, but piping the result to {@link System#out}. */
    @SuppressWarnings("unchecked")
    default SELF printExplain(ExplainFormat format, ExplainDetail... extraDetails) {
        System.out.println(explain(format, extraDetails));
        return (SELF) this;
    }
}

...

Code Block
languagejava
firstline1
titleTableEnvironment
linenumberstrue
@PublicEvolving
public interface TableEnvironment {
    /**
     * Returns the AST of the specified statement and the execution plan to compute the result of
     * the given statement.
     *
     * @param statement The statement for which the AST and execution plan will be returned.
     * @param extraDetails The extra explain details which the explain result should include, e.g.
     *     estimated cost, changelog mode for streaming, displaying execution plan in json format
     * @return AST and the execution plan.
     */
    default String explainSql(String statement, ExplainDetail... extraDetails) {
        return explainSql(statement, ExplainFormat.TEXT, extraDetails);
    }

    /**
     * Returns the AST of the specified statement and the execution plan to compute the result of
     * the given statement.
     *
     * @param statement The statement for which the AST and execution plan will be returned.
     * @param format The output format of explain plan.
     * @param extraDetails The extra explain details which the explain result should include, e.g.
     *     estimated cost, changelog mode for streaming, displaying execution plan in json format
     * @return AST and the execution plan.
     */
    String explainSql(String statement, ExplainFormat format, ExplainDetail... extraDetails);
}

Table Planner

Introduce an interface PlanAnalyzer to analyze the optimized rel nodes generated by Planner. It encloses an inner interface, i.e., AnalyzedResult, to wrap a PlanAdvice with a list of affected rel node IDs.

Code Block
languagejava
firstline1
titlePlanAnalyzer
linenumberstrue
/**
 * Plan analyzer analyzes the optimized physical plan and gives feedback on potential risk warnings
 * and optimization advice.
 */
@Internal
public interface PlanAnalyzer {
    /** Analyze the optimized {@link RelNode} and return {@link AnalyzedResult}. */
    Optional<AnalyzedResult> analyze(FlinkRelNode rel);

    /** The analyzed {@link PlanAdvice} with a list of applicable {@link RelNode} ids. */
    interface AnalyzedResult {

        PlanAdvice getAdvice();

        List<Integer> getTargetIds();
    }
}


Introduce a POJO classPlanAdvicewith the enumKindandScopeto encapsulate the advice given byPlanAnalyzer.

Code Block
languagejava
firstline1
titlePlanAdvice
linenumberstrue
package org.apache.flink.table.planner.analyze;

import org.apache.flink.annotation.Experimental;

/** Plain POJO for advice provided by {@link PlanAnalyzer}. */
@Internal
public final class PlanAdvice {

    private final Kind kind;

    private final Scope scope;

    private String content;

    public PlanAdvice(Kind kind, Scope scope, String content) {
        this.kind = kind;
        this.scope = scope;
        this.content = content;
    }

    public PlanAdvice(Kind kind, Scope scope) {
        this(kind, scope, "");
    }

    public PlanAdvice withContent(String content) {
        this.content = content;
        return this;
    }

    // publicgetters
  String getContent() { ...

    /** Categorize the  return content;semantics of a {@link PlanAdvice}. */
    }@Internal

    public Scopeenum getScope()Kind {
        return scope;
    }

/** Indicate the potential risk. */
     public Kind getKind() {WARNING,
        return kind;/** Indicate the potential optimization. */
        ADVICE
    }

    /** Categorize the semanticsscope of a {@link PlanAdvice}. */
    @Internal
    public enum KindScope {
        /**
     Indicate the potential risk. */
 Indicate the advice is not specific  WARNING,to a {@link
        /** Indicate the potential optimization. * org.apache.calcite.rel.RelNode}.
         */
        ADVICEQUERY_LEVEL,
    }

    /**
 Categorize the scope of a {@link PlanAdvice}. */
    @Internal
    public enum Scope {
        /**
         * Indicate a globalthe advice, which is not specific to a {@link
         * org.apache.calcite.rel.RelNode}.
         */
        GLOBAL,NODE_LEVEL
        /**
         * Indicate a local advice, which could be located to a specific {@link
         * org.apache.calcite.rel.RelNode}.
         */
        LOCAL
    }
}

Proposed Changes

Overall Design

}
}


Proposed Changes

Overall Design

To leverage the planner'To leverage the planner's power to give more understandable, actionable advice closer to the user's perspective, we propose a new explain mode PLAN_ADVICE, which analyzes the optimized physical plan and attaches available tuning advice or data correctness warnings to the output. We also propose to categorize the output format by introducing the enum ExplainFormat, and with ExplainFormat#TEXT as default, which corresponds to the current output format. 

The syntax exposed to users will be

Code Block
languagesql
firstline1
titleUser SQL Syntax
linenumberstrue
EXPLAIN [([CHANGELOG_MODE PLAN_ADVICE <query>| ESTIMATED_COST | PLAN_A
DVICE | JSON_EXECUTION_PLAN]) | PLAN FOR] <query_statement_or_insert_statement_or_statement_set>


When users execute EXPLAIN statement, the SQL is first parsed as an ExplainOperation and sent to TableEnvironment#explainInternal with the specified details and format.

Then Planner#explain will be invoked to generate the optimized physical plan.  The The PlanAnalyzer will loop through the optimized physical plan (i.e., relNodes) to perform analysis.

If the physical rel's pattern matches the analyzer, a piece of PlanAdvice will be tagged to the physical plan. There might be a sequence of advice provided by different analyzers targeting the same query. 

SCOPE and KIND are two orthogonal enums to categorize a PlanAdvice.

KINDNOTE
WARNINGIt reveals potential data correctness risks, such as state expiration, and NDU issues
ADVICEIt suggests potential SQL optimizer tuning configuration, such as enabling mini-batch to optimize to two_phase aggregation.


SCOPENOTE
GLOBALQUERY_LEVELIt provides advice from a global view., targeting the entire query. 
LOCALNODE_LEVELIt can link provides advice to the a specific rel node, such as a table scan or a rank. It will be appended to the target rel node.

For SCOPE as LOCAL, the returned result will be similar to CHANGELOG_MODE and ESTIMATED_COSTS, which appends the info to the target rel node. For SCOPE AS GLOBAL,  "Physical Plan Advice" will be appended after "Optimized Physical Plan", followed by the global advice. 

Use Case

Case-1

SCOPE = LOCAL and KIND = WARNING

Suppose we implement an analyzer to collect all the rel nodes sensitive to state TTL. Then for the following query, the result will attach a warning on the join and aggregate node.

Code Block
languagesql
firstline1
titleQuery Example
linenumberstrue
set 'table.exec.state.ttl' = '36000';  

create table `order` (
  order_id bigint not null primary key not enforced,
  gmt_create timestamp(3) not null,
  buyer_id bigint not null,
  category_id bigint not null,
  amount double not null,
  ptime as proctime()
) with (
  'connector' = 'values',
  'bounded' = 'false'
);  

create table `category` (
  category_id bigint not null primary key not enforced,
  category_name string not null
) with (
  'connector' = 'values',
  'bounded' = 'false'
);

explain plan_advice
select
  b.category_name,
  sum(a.amount) as revenue
from
  `order` a left join `category` for system_time as of `a`.`ptime` as `b`
on a.category_id = b.category_id
group by b.category_name;
.


The current explain result is illustrated in the SQL Explain Result. When ExplainDetails contain PLAN_ADVICE, the output format will be like


Condition 1. The optimized physical plan pattern matches the PlanAnalyzers. The analyzers generate some advice with SCOPE#NODE_LEVEL, then the attributes of relNode will contain a new entry named "advice" with id. Then the advice content will be appended after the plan.

Code Block
languagetext
firstline1
titleOutput format with NODE_LEVEL advice
linenumberstrue
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(count=[$0], word=[$1])
:  +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
:     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+- LogicalProject(count=[$0], word=[$1])
   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])

== Optimized Physical Plan With Advice ==
Union(advice=[1], all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(advice=[2], table=[[default_catalog, default_database, MyTable2]], fields=[count, word])

advice[1]: You might want to pay attention to this node because ...
advice[2]: You might want to pay attention to this node because ...

== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])


Condition 2. The optimized physical plan pattern matches the PlanAnalyzers. The analyzers generate some advice with SCOPE#QUERY_LEVEL, and the advice content will be appended after the plan.

Code Block
languagetext
firstline1
titleOutput format with QUERY_LEVEL advice
linenumberstrue
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(count=[$0], word=[$1])
:  +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
:     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+- LogicalProject(count=[$0], word=[$1])
   
Code Block
languagetext
firstline1
titleOutput Format
linenumberstrue
== Abstract Syntax Tree ==
LogicalAggregate(group=[{0}], revenue=[SUM($1)])
+- LogicalProject(category_name=[$7], amount=[$4])
   +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{3, 5}])
      :- LogicalProject(order_id=[$0], gmt_create=[$1], buyer_id=[$2], category_id=[$3], amount=[$4], ptime=[PROCTIME()])
      :  +- LogicalTableScan(table=[[default_catalog, default_database, order]])
      +- LogicalFilter(condition=[=($cor0.category_id, $0)])
         +- LogicalSnapshot(period=[$cor0.ptime])
            +- LogicalTableScan(table=[[default_catalog, default_database, categoryMyTable2]])

== Optimized Physical Plan =With Advice ==
GroupAggregateUnion(groupByall=[category_nametrue], selectunion=[category_namecount, SUM(amount) AS revenueword])
:- Calc(select=[count, word], warningwhere=[You might want to pay attention to this node because state expiration configuration 'table.exec.state.ttl' is enabled.])
+- Exchange(distribution=[hash[category_name]])
   +- Calc(select=[category_name, amount])
      +- LookupJoin(table=[default_catalog.default_database.category], joinType=[LeftOuterJoin], lookup=[category_id=category_id], select=[category_id, amount, category_id0, category_name], warning=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])

advice[1]: You might want to payconfigure attention'xxx' to this node because state expiration configuration 'table.exec.state.ttl' is enabled.])
         +- TableSourceScan(table=[[default_catalog, default_database, order, project=[category_id, amount], metadata=[]]], fields=[category_id, amount])

== Optimized Execution Plan ==
GroupAggregate(groupBy=[category_name], select=[category_name, SUM(amount) AS revenue])
+- Exchange(distribution=[hash[category_name]])
   +- Calc(select=[category_name, amount])
      +- LookupJoin(table=[default_catalog.default_database.category], joinType=[LeftOuterJoin], lookup=[category_id=category_id], select=[category_id, amount, category_id0, category_name])
         +- TableSourceScan(table=[[default_improve ....
advice[2]: You might want to configure 'yyy' to improve ....

== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])


Condition 3. The optimized physical plan pattern matches the PlanAnalyzers. The analyzers generate some advice with both SCOPE#QUERY_LEVEL and SCOPE#NODE_LEVEL, then NODE_LEVEL advice will be tagged to the target relNode's attributes, followed by QUERY_LEVEL advice.

Code Block
languagetext
firstline1
titleOutput format with QUERY_LEVEL & NODE_LEVEL advice
linenumberstrue
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(count=[$0], word=[$1])
:  +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
:     +- LogicalTableScan(table=[[default_catalog, default_database, order, project=[category_id, amountMyTable1]])
+- LogicalProject(count=[$0], metadataword=[]]], fields=[category_id, amount])

Case-2

SCOPE = GLOBAL and KIND = WARNING

Suppose we implement an analyzer to detect the NDU issues. Then for the following query, the result will attach a piece of global advice after the "Optimized Physical Plan".

Code Block
languagesql
firstline1
titleQuery Example
linenumberstrue
create temporary table cdc_with_meta (
 a int,
 b bigint,
 c string,
 d boolean,
 metadata_1 int metadata,
 metadata_2 string metadata,
 metadata_3 bigint metadata,
 primary key (a) not enforced
) with (
 'connector' = 'values',
 'changelog-mode' = 'I,UA,UB,D',
 'readable-metadata' = 'metadata_1:INT, metadata_2:STRING, metadata_3:BIGINT'
);

create temporary table sink_without_pk (
 a int,
 b bigint,
 c string
) with (
 'connector' = 'values',
 'sink-insert-only' = 'false'
);

explain plan_advice 
insert into sink_without_pk
select a, metadata_3, c
from cdc_with_meta
Code Block
languagetext
firstline1
titleOutput Format
linenumberstrue
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, metadata_3, c])
+- LogicalProject(a=[$0], metadata_3=[$6], c=[$2])
   +- LogicalTableScan$1])
   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])

== Optimized Physical Plan With Advice ==
Union(advice=[1], all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(advice=[2], table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])

advice[1]: You might want to pay attention to this node because ...
advice[2]: You might want to pay attention to this node because ...
adivce[3]: You might want to configure 'yyy' to improve ....

== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_metaMyTable1]], fields=[count, word])

== Optimized Physical Plan ==
Sink+- TableSourceScan(table=[[default_catalog., default_database.sink_without_pk, MyTable2]], fields=[acount, metadata_3, c])
+- Calc(select=[a, metadata_3, c])
   +- TableSourceScan[3](table=[[word])


Condition 4. The optimized physical plan pattern does not match the PlanAnalyzers. No advice is provided.

Code Block
languagetext
firstline1
titleOutput format with no advice
linenumberstrue
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(count=[$0], word=[$1])
:  +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
:     +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, project=[a, cMyTable1]])
+- LogicalProject(count=[$0], metadataword=[metadata_3]]], fields=[a, c, metadata_3$1])
   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])

== Optimized Physical Plan With Advice ==
Union(all=[ADVICEtrue], The metadata column(s): 'metadata_3' in cdc source may cause wrong result or error on downstream operators, please consider removing these columns or use a non-cdc source that only has insert messages.
source node:
union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, cdc_with_metaMyTable2]], projectfields=[acount, c], metadata=[metadata_3]]], fields=[a, c, metadata_3], changelogMode=[I,UB,UA,D], upsertKeys=[[a]])


== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.sink_without_pkword])

advice: No available advice.

== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[acount, metadata_3, cword])
+- CalcTableSourceScan(selecttable=[a[default_catalog, metadata_3, c])
   +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta, project=[a, c], metadata=[metadata_3]]default_database, MyTable2]], fields=[acount, c, metadata_3])

It would be an interactive experience for users to adjust the stream queries to be "correct" and "fast".

Image Removed

PlanAnalzyer

...

word])


Use Case

Case-1

SCOPE = NODE_LEVEL and KIND = WARNING

Suppose we implement an analyzer to collect all the rel nodes sensitive to state TTL. Then for the following query, the result will attach a warning on the join and aggregate node.

Code Block
languagejavasql
firstline1
titleStateExpireRiskAnalyzerCase-1 query
linenumberstrue
set 'table.exec.state.ttl' = '36000';  

create table `order` (
  order_id bigint not null primary key not enforced,
  gmt_create timestamp(3) not null,
  buyer_id bigint not null,
  category_id bigint not null,
  amount double not null,
  ptime as proctime()
) with (
  'connector' = 'values',
  'bounded' = 'false'
);  

create table `category` (
  category_id bigint not null primary key not enforced,
  category_name string not null
) with (
  'connector' = 'values',
  'bounded' = 'false'
);

explain plan_advice
select
  b.category_name,
  sum(a.amount) as revenue
from
  `order` a left join `category` for system_time as of `a`.`ptime` as `b`
on a.category_id = b.category_id
group by b.category_name;


Code Block
languagetext
firstline1
titleCase-1 output format
linenumberstrue
== Abstract Syntax Tree ==
LogicalAggregate(group=[{0}], revenue=[SUM($1)])
+- LogicalProject(category_name=[$7], amount=[$4])
   +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{3, 5}])
      :- LogicalProject(order_id=[$0], gmt_create=[$1], buyer_id=[$2], category_id=[$3], amount=[$4], ptime=[PROCTIME()])
      :  +- LogicalTableScan(table=[[default_catalog, default_database, order]])
      +- LogicalFilter(condition=[=($cor0.category_id, $0)])
/** An implementation of {@link PlanAnalyzer} to collect state TTL sensitive rels. */ 
@Experimental
public class StateExpireRiskAnalyzer implements PlanAnalyzer {

    public static final StateExpireRiskAnalyzer INSTANCE = new StateExpireRiskAnalyzer();

    private static final PlanAdvice STATE_EXPIRE =
            new PlanAdvice(
                    PlanAdvice.Kind.WARNING,
                    PlanAdvice.Scope.LOCAL,
                    String.format(
                            "You might want to pay attention to this node because state expiration configuration '%s' is enabled.",
                        +- LogicalSnapshot(period=[$cor0.ptime])
    ExecutionConfigOptions.IDLE_STATE_RETENTION.key()));

    private static final Set<String> STATE_TTL_INSENSITIVE_REL = new HashSet<>();

    static {
        // excludes the state TTL insensitive rel
    }

    private StateExpireRiskAnalyzer() {}

    @Override
    public Optional<AnalyzedResult> analyze(FlinkRelNode rel) { +- LogicalTableScan(table=[[default_catalog, default_database, category]])

== Optimized Physical Plan With Advice ==
GroupAggregate(advice=[1], groupBy=[category_name], select=[category_name, SUM(amount) AS revenue])
+- Exchange(distribution=[hash[category_name]])
   +- Calc(select=[category_name, amount])
      +- LookupJoin(advice=[1], table=[default_catalog.default_database.category], joinType=[LeftOuterJoin], lookup=[category_id=category_id], select=[category_id, amount, category_id0, category_name])
        List<Integer> targetRelIds+- TableSourceScan(table= new ArrayList<>();
        boolean enableStateTTL =
                ShortcutUtils.unwrapTableConfig(rel)
              [[default_catalog, default_database, order, project=[category_id, amount], metadata=[]]], fields=[category_id, amount])

advice[1]: You might want to pay attention to this node because state expiration configuration 'table.exec.state.ttl' is enabled.

== Optimized Execution Plan ==
GroupAggregate(groupBy=[category_name], select=[category_name, SUM(amount) AS revenue])
+- Exchange(distribution=[hash[category_name]])
   +- Calc(select=[category_name, amount])
                  .get(ExecutionConfigOptions.IDLE_STATE_RETENTION)
        +- LookupJoin(table=[default_catalog.default_database.category], joinType=[LeftOuterJoin], lookup=[category_id=category_id], select=[category_id, amount, category_id0, category_name])
         +- TableSourceScan(table=[[default_catalog, default_database, order, project=[category_id, amount], metadata=[]]], fields=[category_id, amount])


Case-2

SCOPE = QUERY_LEVEL and KIND = WARNING

Suppose we implement an analyzer to detect the NDU issues. Then for the following query, the result will attach a piece of global advice after the "Optimized Physical Plan".

Code Block
languagesql
firstline1
titleCase-2 query
linenumberstrue
create temporary table cdc_with_meta (
 a int,
 b bigint,
 c string,
 d boolean,
 metadata_1 int metadata,
 metadata_2 string metadata,
 metadata_3 bigint metadata,
 primary key (a) not enforced
) with (
 'connector' = 'values',
 'changelog-mode' = 'I,UA,UB,D',
 'readable-metadata' = 'metadata_1:INT, metadata_2:STRING, metadata_3:BIGINT'
);

create temporary table sink_without_pk (
 a int,
 b bigint,
 c string
) with (
 'connector' = 'values',
 'sink-insert-only' = 'false'
);

explain plan_advice 
insert into sink_without_pk
select a, metadata_3, c
from cdc_with_meta


Code Block
languagetext
firstline1
titleCase-2 output format
linenumberstrue
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, metadata_3, c])
+- LogicalProject(a=[$0], metadata_3=[$6], c=[$2])
   +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])

== Optimized Physical Plan With Advice ==
Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, metadata_3, c])
+- Calc(select=[a, metadata_3, c])
   +- TableSourceScan[3](table=[[default_catalog, default_database, cdc_with_meta, project=[a, c], metadata=[metadata_3]]], fields=[a, c, metadata_3])

advice[1]: The metadata column(s): 'metadata_3' in cdc source may cause wrong result or error on downstream operators, please consider removing these columns or use a non-cdc source that only has insert messages.
source node:
TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta, project=[a, c], metadata=[metadata_3]]], fields=[a, c, metadata_3], changelogMode=[I,UB,UA,D], upsertKeys=[[a]])


== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, metadata_3, c])
+- Calc(select=[a, metadata_3, c])
   +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta, project=[a, c], metadata=[metadata_3]]], fields=[a, c, metadata_3])


It would be an interactive experience for users to adjust the stream queries to be "correct" and "fast".

Image Added

PlanAnalzyer

PlanAnalyzer is an internal interface to perform the analysis and provide advice. Introducing a new PlanAnalyzer is similar to adding a new RelOptRule to RuleSet.

Here is an example implementation of PlanAnalyzer to detect all state TTL-sensitive rel nodes and inform users of the risk.

Code Block
languagejava
firstline1
titleStateExpireRiskAnalyzer
linenumberstrue
/** An implementation of {@link PlanAnalyzer} to collect state TTL sensitive rels. */ 
@Experimental
public class StateExpireRiskAnalyzer implements PlanAnalyzer {

    public static final StateExpireRiskAnalyzer INSTANCE = new StateExpireRiskAnalyzer();

    private static final PlanAdvice STATE_EXPIRE =        .toMillis()
                        > 0;
        if (rel instanceof FlinkPhysicalRel && enableStateTTL) {
            rel.accept(
                    new RelShuttleImpl() {
                        @Override
                        public RelNode visit(RelNode other) {
                            if (!STATE_TTL_INSENSITIVE_REL.contains(
                                    other.getClass().getCanonicalName())) {
                                targetRelIds.add(other.getId());
                            }
                            return super.visit(other);
                        }
                    });
            ifnew (!targetRelIds.isEmpty()) {PlanAdvice(
                return Optional.of(
   PlanAdvice.Kind.WARNING,
                     new AnalyzedResult() {
PlanAdvice.Scope.NODE_LEVEL,
                    String.format(
           @Override
                 "You might want to pay attention to this node because state expiration publicconfiguration PlanAdvice getAdvice() {'%s' is enabled.",
                            ExecutionConfigOptions.IDLE_STATE_RETENTION.key()));

    private static returnfinal Set<String> STATE_EXPIRETTL_INSENSITIVE_REL = new HashSet<>();

    static {
        // excludes the state TTL insensitive rel
    }

    private StateExpireRiskAnalyzer() {}

    @Override
    public Optional<AnalyzedResult> analyze(FlinkRelNode rel) {
        List<Integer> targetRelIds = new ArrayList<>();
        boolean enableStateTTL @Override=
                ShortcutUtils.unwrapTableConfig(rel)
            public List<Integer> getTargetIds() {
                 .get(ExecutionConfigOptions.IDLE_STATE_RETENTION)
               return   targetRelIds;
              .toMillis()
              }
          > 0;
        if (rel instanceof FlinkPhysicalRel && }enableStateTTL); {
            }
        }
        return Optional.empty();
    }
}rel.accept(
                    new RelShuttleImpl() {
                        @Override
                        public RelNode visit(RelNode other) {
                            if (!STATE_TTL_INSENSITIVE_REL.contains(
                                    other.getClass().getCanonicalName())) {
                                targetRelIds.add(other.getId());
                            }
                            return super.visit(other);
                        }
                    });
            if (!targetRelIds.isEmpty()) {
                return Optional.of(
                        new AnalyzedResult() {
                            @Override
                            public PlanAdvice getAdvice() {
                                return STATE_EXPIRE;
                            }

                            @Override
                            public List<Integer> getTargetIds() {
                                return targetRelIds;
                            }
                        });
            }
        }
        return Optional.empty();
    }
}

ExplainFormat

The problem with the current output format is that it is a mixture of plain text (AST, Optimized Physical Plan, and Optimized Execution Plan) and JSON (Physical Execution Plan,  via EXPLAIN JSON_EXECUTION_PLAN ), which is not structured and categorized.

By introducing ExplainFormat, we can better categorize the explain format. ExplainFormat#TEXT,  which corresponds to the current output format, will be the default format to achieve backward compatibility.

In the future, we might introduce more formats along with the syntax EXPLAIN [ExplainDetails...] WITH FORMAT [TEXT | JSON] <query> (similar to MySQL Explain)

Note: this is just an illustration. The exact syntax is beyond the scope of this FLIP.

Open Questions

Q: What is the relationship between PLAN_ADVICE, CHANGELOG_MODE, and ESITMATED_COSTSCOST? Can I explain all of those together? What will the result look like?

A: PLAN_ADVICE is another kind of explain detail towards CHANGELOG_MODE and ESTIMATED_COSTSCOST. Users can choose to explain multiple details at the same time.

Code Block
languagesql
firstline1
titleUser SQL Syntax
linenumberstrue
EXPLAIN PLAN_ADVICE, CHANGELOG_MODE, ESTIMATED_COSTSCOST <query>


Q: Why propose this feature in the community? It sounds more suitable for the platform side to implement.

A: User experience means a lot. Users will gain benefits if the community would like to provide a basic implementation. This feature also lays a foundation for custom analyzers to refer to the clarified API and behavior.

Future Work

In the future, there are mainly three follow-ups that can be achieved

  • Introduce ExplainFormat#JSON to output structured results, to facilitate the visualization of plans with tagged advice, which provides a more intuitive illustration for users to understand their queries.
  • Introduce more analyzers to perform analysis.
  • Extend the situation to cover batch mode.

Compatibility, Deprecation, and Migration Plan

It will not violate any compatibility, and no migration is needed.

Test Plan

We will test this FLIP in three ways:

  1. Make sure that the existing tests can pass.

  2. Test the explain functionality works as expected for different statements, like single query, statement set, and under sub plan reuse condition.

  3. Test the plan analyzer works as expected under different scenarios, like local/global v.s. advice/warning.

  4. Add new ITCase and E2E tests.

Rejected Alternatives

Rejected User Syntax

Code Block
languagesql
firstline1
titleRejected User Syntax
linenumberstrue
EXPLAIN JSON_ANALYZED_PLAN <query>

On first thought, we want to align the syntax with EXPLAIN JSON_EXECUTION_PLAN. However, it is unclear to know which plan needs to be analyzed. This brings out the second question, which plan should be analyzedinvestigated

Analyze JSON Execution Plan v.s. Optimized Physical Plan

Q: Why not enhance ExplainDetail#JSONExplainDetail#JSON_EXECUTION_PLAN, but choose the target to be optimized rel?

A: Currently, we have three ExplainDetail: CHANGLOG_MODE, ESTIMATED_COSTS COST, and JSON_EXECUTION_PLAN.

The relationship between them and the corresponding SQL-processing phase is illustrated below. Stream Graph is first excluded because we want to limit this analysis within the compile time. And from this Fig, we can tell that optimized rel is the closest to the original query.


Image RemovedImage Added