Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: "Under Discussion"

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Although SQL has been widely adopted among Flink users in recent years, we noticed that it is still not easy for users to write the "correct" and "fast" stream queries to achieve what they want, and the general familiarity with the concept of the dynamic table and the retraction might not be enough to troubleshoot the specific cases, even for experienced users. We might have posted detailed documentation, such as data correctness issues caused by NDU and performance tuning. Nevertheless, users must match the issue pattern with the documented item to have a fix/workaround. For users, this process seems like a repetitive pull-based guess-and-try. There is a considerable gap between the documented concepts/optimization techniques and the actual, applicable actions. 

...

  • Users can get instant advice when explaining the query with the proposed syntax EXPLAIN PLAN_ADVICE.
  • This FLIP lays the groundwork for further improvement. For example, the community might implement more analyzers or provide the visualization of the physical plan with advice (by Flink Plan Visualizer) in the future.

Public Interfaces

We propose to add/change the following interfaces/classes.

Change ListModuleAnnotation
Explainableflink-table-api-javaPublicEvolving
ExplainFormat
ExplainDetail
TableEnvironment
PlanAnalyzerflink-table-plannerInternal
PlanAdvice

Table API

Code Block
languagejava
firstline1
titleExplainable
linenumberstrue
/**
 * Represents an artifact that can be explained using a summary string.
 *
 * @see #explain(ExplainDetail...)
 */
@PublicEvolving
public interface Explainable<SELF extends Explainable<SELF>> {

    /**
     * Returns the AST of this object and the execution plan to compute the result of the given
     * statement.
     *
     * @param extraDetails The extra explain details which the result of this method should include,
     *     e.g. estimated cost, changelog mode for streaming
     * @return AST and the execution plan.
     */
    default String explain(ExplainDetail... extraDetails) {
        return explain(ExplainFormat.TEXT, extraDetails);
    }

    /**
     * Returns the AST of this object and the execution plan to compute the result of the given
     * statement.
     *
     * @param format The output format of explain plan
     * @param extraDetails The extra explain details which the result of this method should include,
     *     e.g. estimated cost, changelog mode for streaming
     * @return AST and the execution plan.
     */
    String explain(ExplainFormat format, ExplainDetail... extraDetails);

    /** Like {@link #explain(ExplainDetail...)}, but piping the result to {@link System#out}. */
    @SuppressWarnings("unchecked")
    default SELF printExplain(ExplainDetail... extraDetails) {
        return printExplain(ExplainFormat.TEXT, extraDetails);
    }

    /** Like {@link #explain(ExplainDetail...)}, but piping the result to {@link System#out}. */
    @SuppressWarnings("unchecked")
    default SELF printExplain(ExplainFormat format, ExplainDetail... extraDetails) {
        System.out.println(explain(format, extraDetails));
        return (SELF) this;
    }
}

...

Code Block
languagejava
firstline1
titleTableEnvironment
linenumberstrue
@PublicEvolving
public interface TableEnvironment {
    /**
     * Returns the AST of the specified statement and the execution plan to compute the result of
     * the given statement.
     *
     * @param statement The statement for which the AST and execution plan will be returned.
     * @param extraDetails The extra explain details which the explain result should include, e.g.
     *     estimated cost, changelog mode for streaming, displaying execution plan in json format
     * @return AST and the execution plan.
     */
    default String explainSql(String statement, ExplainDetail... extraDetails) {
        return explainSql(statement, ExplainFormat.TEXT, extraDetails);
    }

    /**
     * Returns the AST of the specified statement and the execution plan to compute the result of
     * the given statement.
     *
     * @param statement The statement for which the AST and execution plan will be returned.
     * @param format The output format of explain plan.
     * @param extraDetails The extra explain details which the explain result should include, e.g.
     *     estimated cost, changelog mode for streaming, displaying execution plan in json format
     * @return AST and the execution plan.
     */
    String explainSql(String statement, ExplainFormat format, ExplainDetail... extraDetails);
}

Table Planner

Introduce an interface PlanAnalyzer to analyze the optimized rel nodes generated by Planner. It encloses an inner interface, i.e., AnalyzedResult, to wrap a PlanAdvice with a list of affected rel node IDs.

...

Code Block
languagejava
firstline1
titlePlanAdvice
linenumberstrue
package org.apache.flink.table.planner.analyze;

import org.apache.flink.annotation.Experimental;

/** Plain POJO for advice provided by {@link PlanAnalyzer}. */
@Internal
public final class PlanAdvice {

    private final Kind kind;

    private final Scope scope;

    private String content;

    public PlanAdvice(Kind kind, Scope scope, String content) {
        this.kind = kind;
        this.scope = scope;
        this.content = content;
    }

    public PlanAdvice(Kind kind, Scope scope) {
        this(kind, scope, "");
    }

    public PlanAdvice withContent(String content) {
        this.content = content;
        return this;
    }

    // getters
    ...

    /** Categorize the semantics of a {@link PlanAdvice}. */
    @Internal
    public enum Kind {
        /** Indicate the potential risk. */
        WARNING,
        /** Indicate the potential optimization. */
        ADVICE
    }

    /** Categorize the scope of a {@link PlanAdvice}. */
    @Internal
    public enum Scope {
        /**
         * Indicate the advice is not specific to a {@link
         * org.apache.calcite.rel.RelNode}.
         */
        QUERY_LEVEL,
        /**
         * Indicate the advice is specific to a {@link
         * org.apache.calcite.rel.RelNode}.
         */
        NODE_LEVEL
    }
}


Proposed Changes

Overall Design

To leverage the planner's power to give more understandable, actionable advice closer to the user's perspective, we propose a new explain mode PLAN_ADVICE, which analyzes the optimized physical plan and attaches available tuning advice or data correctness warnings to the output. 

...

Code Block
languagetext
firstline1
titleOutput format with no advice
linenumberstrue
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(count=[$0], word=[$1])
:  +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
:     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+- LogicalProject(count=[$0], word=[$1])
   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])

== Optimized Physical Plan With Advice ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])

advice: No available advice.

== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
:  +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])


Use Case

Case-1

SCOPE = NODE_LEVEL and KIND = WARNING

...

Code Block
languagetext
firstline1
titleCase-1 output format
linenumberstrue
== Abstract Syntax Tree ==
LogicalAggregate(group=[{0}], revenue=[SUM($1)])
+- LogicalProject(category_name=[$7], amount=[$4])
   +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{3, 5}])
      :- LogicalProject(order_id=[$0], gmt_create=[$1], buyer_id=[$2], category_id=[$3], amount=[$4], ptime=[PROCTIME()])
      :  +- LogicalTableScan(table=[[default_catalog, default_database, order]])
      +- LogicalFilter(condition=[=($cor0.category_id, $0)])
         +- LogicalSnapshot(period=[$cor0.ptime])
            +- LogicalTableScan(table=[[default_catalog, default_database, category]])

== Optimized Physical Plan With Advice ==
GroupAggregate(advice=[1], groupBy=[category_name], select=[category_name, SUM(amount) AS revenue])
+- Exchange(distribution=[hash[category_name]])
   +- Calc(select=[category_name, amount])
      +- LookupJoin(advice=[1], table=[default_catalog.default_database.category], joinType=[LeftOuterJoin], lookup=[category_id=category_id], select=[category_id, amount, category_id0, category_name])
         +- TableSourceScan(table=[[default_catalog, default_database, order, project=[category_id, amount], metadata=[]]], fields=[category_id, amount])

advice[1]: You might want to pay attention to this node because state expiration configuration 'table.exec.state.ttl' is enabled.

== Optimized Execution Plan ==
GroupAggregate(groupBy=[category_name], select=[category_name, SUM(amount) AS revenue])
+- Exchange(distribution=[hash[category_name]])
   +- Calc(select=[category_name, amount])
      +- LookupJoin(table=[default_catalog.default_database.category], joinType=[LeftOuterJoin], lookup=[category_id=category_id], select=[category_id, amount, category_id0, category_name])
         +- TableSourceScan(table=[[default_catalog, default_database, order, project=[category_id, amount], metadata=[]]], fields=[category_id, amount])


Case-2

SCOPE = QUERY_LEVEL and KIND = WARNING

...

It would be an interactive experience for users to adjust the stream queries to be "correct" and "fast".

PlanAnalzyer

PlanAnalyzer is an internal interface to perform the analysis and provide advice. Introducing a new PlanAnalyzer is similar to adding a new RelOptRule to RuleSet.

...

Code Block
languagejava
firstline1
titleStateExpireRiskAnalyzer
linenumberstrue
/** An implementation of {@link PlanAnalyzer} to collect state TTL sensitive rels. */ 
@Experimental
public class StateExpireRiskAnalyzer implements PlanAnalyzer {

    public static final StateExpireRiskAnalyzer INSTANCE = new StateExpireRiskAnalyzer();

    private static final PlanAdvice STATE_EXPIRE =
            new PlanAdvice(
                    PlanAdvice.Kind.WARNING,
                    PlanAdvice.Scope.NODE_LEVEL,
                    String.format(
                            "You might want to pay attention to this node because state expiration configuration '%s' is enabled.",
                            ExecutionConfigOptions.IDLE_STATE_RETENTION.key()));

    private static final Set<String> STATE_TTL_INSENSITIVE_REL = new HashSet<>();

    static {
        // excludes the state TTL insensitive rel
    }

    private StateExpireRiskAnalyzer() {}

    @Override
    public Optional<AnalyzedResult> analyze(FlinkRelNode rel) {
        List<Integer> targetRelIds = new ArrayList<>();
        boolean enableStateTTL =
                ShortcutUtils.unwrapTableConfig(rel)
                                .get(ExecutionConfigOptions.IDLE_STATE_RETENTION)
                                .toMillis()
                        > 0;
        if (rel instanceof FlinkPhysicalRel && enableStateTTL) {
            rel.accept(
                    new RelShuttleImpl() {
                        @Override
                        public RelNode visit(RelNode other) {
                            if (!STATE_TTL_INSENSITIVE_REL.contains(
                                    other.getClass().getCanonicalName())) {
                                targetRelIds.add(other.getId());
                            }
                            return super.visit(other);
                        }
                    });
            if (!targetRelIds.isEmpty()) {
                return Optional.of(
                        new AnalyzedResult() {
                            @Override
                            public PlanAdvice getAdvice() {
                                return STATE_EXPIRE;
                            }

                            @Override
                            public List<Integer> getTargetIds() {
                                return targetRelIds;
                            }
                        });
            }
        }
        return Optional.empty();
    }
}

ExplainFormat

The problem with the current output format is that it is a mixture of plain text (AST, Optimized Physical Plan, and Optimized Execution Plan) and JSON (Physical Execution Plan,  via EXPLAIN JSON_EXECUTION_PLAN ), which is not structured and categorized.

...

In the future, we might introduce more formats along with the syntax EXPLAIN [ExplainDetails...] WITH FORMAT [TEXT | JSON] FORMAT <query>. While this <query> (similar to MySQL Explain)

Note: this is just an illustration. The exact syntax is beyond the scope of this FLIP.

Open Questions

Q: What is the relationship between PLAN_ADVICE, CHANGELOG_MODE, and ESITMATED_COSTS? Can I explain all of those together? What will the result look like?

...

A: User experience means a lot. Users will gain benefits if the community would like to provide a basic implementation. This feature also lays a foundation for custom analyzers to refer to the clarified API and behavior.

Future Work

In the future, there are mainly three follow-ups that can be achieved

  • Introduce ExplainFormat#JSON to output structured results, to facilitate the visualization of plans with tagged advice, which provides a more intuitive illustration for users to understand their queries.
  • Introduce more analyzers to perform analysis.
  • Extend the situation to cover batch mode.

Compatibility, Deprecation, and Migration Plan

It will not violate any compatibility, and no migration is needed.

Test Plan

We will test this FLIP in three ways:

  1. Make sure that the existing tests can pass.

  2. Test the explain functionality works as expected for different statements, like single query, statement set, and under sub plan reuse condition.

  3. Test the plan analyzer works as expected under different scenarios, like local/global v.s. advice/warning.

  4. Add new ITCase and E2E tests.

Rejected Alternatives

Rejected User Syntax

Code Block
languagesql
firstline1
titleRejected User Syntax
linenumberstrue
EXPLAIN JSON_ANALYZED_PLAN <query>

On first thought, we want to align the syntax with EXPLAIN JSON_EXECUTION_PLAN. However, it is unclear to know which plan needs to be analyzed. This brings out the second question, which plan should be investigated? 

Analyze JSON Execution Plan v.s. Optimized Physical Plan

Q: Why not enhance ExplainDetail#JSON_EXECUTION_PLAN, but choose the target to be optimized rel?

...