Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: update the release version

Status

Current state: "Under Discussion" Accepted

Discussion thread: https://lists.apache.org/thread/5xywxv7g43byoh0jbx1b6qo6gx6wjkcz

Vote thread: https://lists.apache.org/thread/bsgqvvs9wx1dkv7p3m9ctockh84rl11j

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-30650
JIRA: TBD

Released: <Flink Version>1.17.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Although SQL has been widely adopted among Flink users in recent years, we noticed that it is still not easy for users to write the "correct" and "fast" stream queries to achieve what they want, and the general familiarity with the concept of the dynamic table and the retraction might not be enough to troubleshoot the specific cases, even for experienced users. We might have posted detailed documentation, such as data correctness issues caused by NDU and performance tuning. Nevertheless, users must match the issue pattern with the documented item to have a fix/workaround. For users, this process seems like a repetitive pull-based guess-and-try. There is a considerable gap between the documented concepts/optimization techniques and the actual, applicable actions. 

...

  • Users can get instant advice when explaining the query with the proposed syntax EXPLAIN PLAN_ADVICE.
  • This FLIP lays the groundwork for further improvement. For example, the community might implement more analyzers or provide the visualization of the physical plan with advice (by Flink Plan Visualizer) in the future.

Public Interfaces

We propose to add/change the following interfaces/classes.

Change ListModuleAnnotation
Explainableflink-table-api-javaPublicEvolving
ExplainFormat
ExplainDetail
TableEnvironment
PlanAnalyzerflink-table-plannerInternal
PlanAdvice

Table API

Code Block
languagejava
firstline1
titleExplainable
linenumberstrue
/**
 * Represents an artifact that can be explained using a summary string.
 *
 * @see #explain(ExplainDetail...)
 */
@PublicEvolving
public interface Explainable<SELF extends Explainable<SELF>> {

    /**
     * Returns the AST of this object and the execution plan to compute the result of the given
     * statement.
     *
     * @param extraDetails The extra explain details which the result of this method should include,
     *     e.g. estimated cost, changelog mode for streaming
     * @return AST and the execution plan.
     */
    default String explain(ExplainDetail... extraDetails) {
        return explain(ExplainFormat.TEXT, extraDetails);
    }

    /**
     * Returns the AST of this object and the execution plan to compute the result of the given
     * statement.
     *
     * @param format The output format of explain plan
     * @param extraDetails The extra explain details which the result of this method should include,
     *     e.g. estimated cost, changelog mode for streaming
     * @return AST and the execution plan.
     */
    String explain(ExplainFormat format, ExplainDetail... extraDetails);

    /** Like {@link #explain(ExplainDetail...)}, but piping the result to {@link System#out}. */
    @SuppressWarnings("unchecked")
    default SELF printExplain(ExplainDetail... extraDetails) {
        return printExplain(ExplainFormat.TEXT, extraDetails);
    }

    /** Like {@link #explain(ExplainDetail...)}, but piping the result to {@link System#out}. */
    @SuppressWarnings("unchecked")
    default SELF printExplain(ExplainFormat format, ExplainDetail... extraDetails) {
        System.out.println(explain(format, extraDetails));
        return (SELF) this;
    }
}

...

Code Block
languagejava
firstline1
titleTableEnvironment
linenumberstrue
@PublicEvolving
public interface TableEnvironment {
    /**
     * Returns the AST of the specified statement and the execution plan to compute the result of
     * the given statement.
     *
     * @param statement The statement for which the AST and execution plan will be returned.
     * @param extraDetails The extra explain details which the explain result should include, e.g.
     *     estimated cost, changelog mode for streaming, displaying execution plan in json format
     * @return AST and the execution plan.
     */
    default String explainSql(String statement, ExplainDetail... extraDetails) {
        return explainSql(statement, ExplainFormat.TEXT, extraDetails);
    }

    /**
     * Returns the AST of the specified statement and the execution plan to compute the result of
     * the given statement.
     *
     * @param statement The statement for which the AST and execution plan will be returned.
     * @param format The output format of explain plan.
     * @param extraDetails The extra explain details which the explain result should include, e.g.
     *     estimated cost, changelog mode for streaming, displaying execution plan in json format
     * @return AST and the execution plan.
     */
    String explainSql(String statement, ExplainFormat format, ExplainDetail... extraDetails);
}

Table Planner

Introduce an interface PlanAnalyzer to analyze the optimized rel nodes generated by Planner. It encloses an inner interface, i.e., AnalyzedResult, to wrap a PlanAdvice with a list of affected rel node IDs.

...

Code Block
languagejava
firstline1
titlePlanAdvice
linenumberstrue
package org.apache.flink.table.planner.analyze;

import org.apache.flink.annotation.Experimental;

/** Plain POJO for advice provided by {@link PlanAnalyzer}. */
@Internal
public final class PlanAdvice {

    private final Kind kind;

    private final Scope scope;

    private String content;

    public PlanAdvice(Kind kind, Scope scope, String content) {
        this.kind = kind;
        this.scope = scope;
        this.content = content;
    }

    public PlanAdvice(Kind kind, Scope scope) {
        this(kind, scope, "");
    }

    public PlanAdvice withContent(String content) {
        this.content = content;
        return this;
    }

    // getters
    ...

    /** Categorize the semantics of a {@link PlanAdvice}. */
    @Internal
    public enum Kind {
        /** Indicate the potential risk. */
        WARNING,
        /** Indicate the potential optimization. */
        ADVICE
    }

    /** Categorize the scope of a {@link PlanAdvice}. */
    @Internal
    public enum Scope {
        /**
         * Indicate the advice is not specific to a {@link
         * org.apache.calcite.rel.RelNode}.
         */
        QUERY_LEVEL,
        /**
         * Indicate the advice is specific to a {@link
         * org.apache.calcite.rel.RelNode}.
         */
        NODE_LEVEL
    }
}


Proposed Changes

Overall Design

To leverage the planner's power to give more understandable, actionable advice closer to the user's perspective, we propose a new explain mode PLAN_ADVICE, which analyzes the optimized physical plan and attaches available tuning advice or data correctness warnings to the output. 

...

Code Block
languagetext
firstline1
titleOutputFormat Output format with NODE_LEVEL advice
linenumberstrue
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(count=[$0], word=[$1])
:  +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
:     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+- LogicalProject(count=[$0], word=[$1])
   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])

== Optimized Physical Plan With Advice ==
Union(advice=[1], all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(advice=[2], table=[[default_catalog, default_database, MyTable2]], fields=[count, word])

advice[1]: You might want to pay attention to this node because ...
advice[2]: You might want to pay attention to this node because ...

== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])

...

Code Block
languagetext
firstline1
titleOutputFormat Output format with QUERY_LEVEL advice
linenumberstrue
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(count=[$0], word=[$1])
:  +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
:     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+- LogicalProject(count=[$0], word=[$1])
   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])

== Optimized Physical Plan With Advice ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])

advice[1]: You might want to configure 'xxx' to improve ....
advice[2]: You might want to configure 'yyy' to improve ....

== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])

...

Code Block
languagetext
firstline1
titleOutputFormat Output format with QUERY_LEVEL & NODE_LEVEL advice
linenumberstrue
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(count=[$0], word=[$1])
:  +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
:     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+- LogicalProject(count=[$0], word=[$1])
   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])

== Optimized Physical Plan With Advice ==
Union(advice=[1], all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(advice=[2], table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])

advice[1]: You might want to pay attention to this node because ...
advice[2]: You might want to pay attention to this node because ...
adivce[3]: You might want to configure 'yyy' to improve ....

== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])

...

Code Block
languagetext
firstline1
titleOutputFormat Output format with no advice
linenumberstrue
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(count=[$0], word=[$1])
:  +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
:     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+- LogicalProject(count=[$0], word=[$1])
   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])

== Optimized Physical Plan With Advice ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])

advice: No available advice.

== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])


Use Case

Case-1

SCOPE = NODE_LEVEL and KIND = WARNING

...

Code Block
languagesql
firstline1
titleCase-1 Query Examplequery
linenumberstrue
set 'table.exec.state.ttl' = '36000';  

create table `order` (
  order_id bigint not null primary key not enforced,
  gmt_create timestamp(3) not null,
  buyer_id bigint not null,
  category_id bigint not null,
  amount double not null,
  ptime as proctime()
) with (
  'connector' = 'values',
  'bounded' = 'false'
);  

create table `category` (
  category_id bigint not null primary key not enforced,
  category_name string not null
) with (
  'connector' = 'values',
  'bounded' = 'false'
);

explain plan_advice
select
  b.category_name,
  sum(a.amount) as revenue
from
  `order` a left join `category` for system_time as of `a`.`ptime` as `b`
on a.category_id = b.category_id
group by b.category_name;

...

Code Block
languagetext
firstline1
titleCase-1 Output Formatoutput format
linenumberstrue
== Abstract Syntax Tree ==
LogicalAggregate(group=[{0}], revenue=[SUM($1)])
+- LogicalProject(category_name=[$7], amount=[$4])
   +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{3, 5}])
      :- LogicalProject(order_id=[$0], gmt_create=[$1], buyer_id=[$2], category_id=[$3], amount=[$4], ptime=[PROCTIME()])
      :  +- LogicalTableScan(table=[[default_catalog, default_database, order]])
      +- LogicalFilter(condition=[=($cor0.category_id, $0)])
         +- LogicalSnapshot(period=[$cor0.ptime])
            +- LogicalTableScan(table=[[default_catalog, default_database, category]])

== Optimized Physical Plan With Advice ==
GroupAggregate(advice=[1], groupBy=[category_name], select=[category_name, SUM(amount) AS revenue])
+- Exchange(distribution=[hash[category_name]])
   +- Calc(select=[category_name, amount])
      +- LookupJoin(advice=[1], table=[default_catalog.default_database.category], joinType=[LeftOuterJoin], lookup=[category_id=category_id], select=[category_id, amount, category_id0, category_name])
         +- TableSourceScan(table=[[default_catalog, default_database, order, project=[category_id, amount], metadata=[]]], fields=[category_id, amount])

advice[1]: You might want to pay attention to this node because state expiration configuration 'table.exec.state.ttl' is enabled.

== Optimized Execution Plan ==
GroupAggregate(groupBy=[category_name], select=[category_name, SUM(amount) AS revenue])
+- Exchange(distribution=[hash[category_name]])
   +- Calc(select=[category_name, amount])
      +- LookupJoin(table=[default_catalog.default_database.category], joinType=[LeftOuterJoin], lookup=[category_id=category_id], select=[category_id, amount, category_id0, category_name])
         +- TableSourceScan(table=[[default_catalog, default_database, order, project=[category_id, amount], metadata=[]]], fields=[category_id, amount])


Case-2

SCOPE = QUERY_LEVEL and KIND = WARNING

...

Code Block
languagesql
firstline1
titleCase-2 Query Examplequery
linenumberstrue
create temporary table cdc_with_meta (
 a int,
 b bigint,
 c string,
 d boolean,
 metadata_1 int metadata,
 metadata_2 string metadata,
 metadata_3 bigint metadata,
 primary key (a) not enforced
) with (
 'connector' = 'values',
 'changelog-mode' = 'I,UA,UB,D',
 'readable-metadata' = 'metadata_1:INT, metadata_2:STRING, metadata_3:BIGINT'
);

create temporary table sink_without_pk (
 a int,
 b bigint,
 c string
) with (
 'connector' = 'values',
 'sink-insert-only' = 'false'
);

explain plan_advice 
insert into sink_without_pk
select a, metadata_3, c
from cdc_with_meta

...

Code Block
languagetext
firstline1
titleCase-2 Output Formatoutput format
linenumberstrue
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, metadata_3, c])
+- LogicalProject(a=[$0], metadata_3=[$6], c=[$2])
   +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])

== Optimized Physical Plan With Advice ==
Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, metadata_3, c])
+- Calc(select=[a, metadata_3, c])
   +- TableSourceScan[3](table=[[default_catalog, default_database, cdc_with_meta, project=[a, c], metadata=[metadata_3]]], fields=[a, c, metadata_3])

advice[1]: The metadata column(s): 'metadata_3' in cdc source may cause wrong result or error on downstream operators, please consider removing these columns or use a non-cdc source that only has insert messages.
source node:
TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta, project=[a, c], metadata=[metadata_3]]], fields=[a, c, metadata_3], changelogMode=[I,UB,UA,D], upsertKeys=[[a]])


== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, metadata_3, c])
+- Calc(select=[a, metadata_3, c])
   +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta, project=[a, c], metadata=[metadata_3]]], fields=[a, c, metadata_3])

...

It would be an interactive experience for users to adjust the stream queries to be "correct" and "fast".

PlanAnalzyer

PlanAnalyzer is an internal interface to perform the analysis and provide advice. Introducing a new PlanAnalyzer is similar to adding a new RelOptRule to RuleSet.

...

Code Block
languagejava
firstline1
titleStateExpireRiskAnalyzer
linenumberstrue
/** An implementation of {@link PlanAnalyzer} to collect state TTL sensitive rels. */ 
@Experimental
public class StateExpireRiskAnalyzer implements PlanAnalyzer {

    public static final StateExpireRiskAnalyzer INSTANCE = new StateExpireRiskAnalyzer();

    private static final PlanAdvice STATE_EXPIRE =
            new PlanAdvice(
                    PlanAdvice.Kind.WARNING,
                    PlanAdvice.Scope.NODE_LEVEL,
                    String.format(
                            "You might want to pay attention to this node because state expiration configuration '%s' is enabled.",
                            ExecutionConfigOptions.IDLE_STATE_RETENTION.key()));

    private static final Set<String> STATE_TTL_INSENSITIVE_REL = new HashSet<>();

    static {
        // excludes the state TTL insensitive rel
    }

    private StateExpireRiskAnalyzer() {}

    @Override
    public Optional<AnalyzedResult> analyze(FlinkRelNode rel) {
        List<Integer> targetRelIds = new ArrayList<>();
        boolean enableStateTTL =
                ShortcutUtils.unwrapTableConfig(rel)
                                .get(ExecutionConfigOptions.IDLE_STATE_RETENTION)
                                .toMillis()
                        > 0;
        if (rel instanceof FlinkPhysicalRel && enableStateTTL) {
            rel.accept(
                    new RelShuttleImpl() {
                        @Override
                        public RelNode visit(RelNode other) {
                            if (!STATE_TTL_INSENSITIVE_REL.contains(
                                    other.getClass().getCanonicalName())) {
                                targetRelIds.add(other.getId());
                            }
                            return super.visit(other);
                        }
                    });
            if (!targetRelIds.isEmpty()) {
                return Optional.of(
                        new AnalyzedResult() {
                            @Override
                            public PlanAdvice getAdvice() {
                                return STATE_EXPIRE;
                            }

                            @Override
                            public List<Integer> getTargetIds() {
                                return targetRelIds;
                            }
                        });
            }
        }
        return Optional.empty();
    }
}

ExplainFormat

The problem with the current output format is that it is a mixture of plain text (AST, Optimized Physical Plan, and Optimized Execution Plan) and JSON (Physical Execution Plan,  via EXPLAIN JSON_EXECUTION_PLAN ), which is not structured and categorized.

...

In the future, we might introduce more formats along with the syntax EXPLAIN [ExplainDetails...] WITH FORMAT [TEXT | JSON] FORMAT <query>. While this <query> (similar to MySQL Explain)

Note: this is just an illustration. The exact syntax is beyond the scope of this FLIP.

Open Questions

Q: What is the relationship between PLAN_ADVICE, CHANGELOG_MODE, and ESITMATED_COSTSCOST? Can I explain all of those together? What will the result look like?

A: PLAN_ADVICE is another kind of explain detail towards CHANGELOG_MODE and ESTIMATED_COSTSCOST. Users can choose to explain multiple details at the same time.

Code Block
languagesql
firstline1
titleUser SQL Syntax
linenumberstrue
EXPLAIN PLAN_ADVICE, CHANGELOG_MODE, ESTIMATED_COSTSCOST <query>


Q: Why propose this feature in the community? It sounds more suitable for the platform side to implement.

A: User experience means a lot. Users will gain benefits if the community would like to provide a basic implementation. This feature also lays a foundation for custom analyzers to refer to the clarified API and behavior.

Future Work

In the future, there are mainly three follow-ups that can be achieved

  • Introduce ExplainFormat#JSON to output structured results, to facilitate the visualization of plans with tagged advice, which provides a more intuitive illustration for users to understand their queries.
  • Introduce more analyzers to perform analysis.
  • Extend the situation to cover batch mode.

Compatibility, Deprecation, and Migration Plan

It will not violate any compatibility, and no migration is needed.

Test Plan

We will test this FLIP in three ways:

  1. Make sure that the existing tests can pass.

  2. Test the explain functionality works as expected for different statements, like single query, statement set, and under sub plan reuse condition.

  3. Test the plan analyzer works as expected under different scenarios, like local/global v.s. advice/warning.

  4. Add new ITCase and E2E tests.

Rejected Alternatives

Rejected User Syntax

Code Block
languagesql
firstline1
titleRejected User Syntax
linenumberstrue
EXPLAIN JSON_ANALYZED_PLAN <query>

On first thought, we want to align the syntax with EXPLAIN JSON_EXECUTION_PLAN. However, it is unclear to know which plan needs to be analyzed. This brings out the second question, which plan should be investigated? 

Analyze JSON Execution Plan v.s. Optimized Physical Plan

Q: Why not enhance ExplainDetail#JSON_EXECUTION_PLAN, but choose the target to be optimized rel?

...

The relationship between them and the corresponding SQL-processing phase is illustrated below. Stream Graph is first excluded because we want to limit this analysis within the compile time. And from this Fig, we can tell that optimized rel is the closest to the original query.


Image RemovedImage Added