Although SQL has been widely adopted among Flink users in recent years, we noticed that it is still not easy for users to write the "correct" and "fast" stream queries to achieve what they want, and the general familiarity with the concept of the dynamic table and the retraction might not be enough to troubleshoot the specific cases, even for experienced users. We might have posted detailed documentation, such as data correctness issues caused by NDU and performance tuning. Nevertheless, users must match the issue pattern with the documented item to have a fix/workaround. For users, this process seems like a repetitive pull-based guess-and-try. There is a considerable gap between the documented concepts/optimization techniques and the actual, applicable actions. 

Hence we propose

  • A new explain mode PLAN_ADVICE to the current ExplainDetails. EXPLAIN PLAN_ADVICE <query> will provide users with instant and actionable SQL advice per query, which is absent in the current explain mechanism.
  • A new enum ExplainFormat to better categorize the output format of the explained result. The current EXPLAIN result presents users with a tree-style plain text format, which is only human-readable. By categorizing the format, it is flexible to extend to a more structured format (like JSON) in the future to perform visualization or other analysis.

What do users benefit from this FLIP?

  • Users can get instant advice when explaining the query with the proposed syntax EXPLAIN PLAN_ADVICE.
  • Users can further develop custom analyzers based on the proposed API.

Public Interfaces

We propose to add/change the following interfaces/classes.

Change ListModuleAnnotation

Table API

 * Represents an artifact that can be explained using a summary string.
 * @see #explain(ExplainDetail...)
public interface Explainable<SELF extends Explainable<SELF>> {

     * Returns the AST of this object and the execution plan to compute the result of the given
     * statement.
     * @param extraDetails The extra explain details which the result of this method should include,
     *     e.g. estimated cost, changelog mode for streaming
     * @return AST and the execution plan.
    default String explain(ExplainDetail... extraDetails) {
        return explain(ExplainFormat.PLAIN_TEXT, extraDetails);

     * Returns the AST of this object and the execution plan to compute the result of the given
     * statement.
     * @param format The output format of explain plan
     * @param extraDetails The extra explain details which the result of this method should include,
     *     e.g. estimated cost, changelog mode for streaming
     * @return AST and the execution plan.
    String explain(ExplainFormat format, ExplainDetail... extraDetails);

    /** Like {@link #explain(ExplainDetail...)}, but piping the result to {@link System#out}. */
    default SELF printExplain(ExplainDetail... extraDetails) {
        return printExplain(ExplainFormat.PLAIN_TEXT, extraDetails);

    /** Like {@link #explain(ExplainDetail...)}, but piping the result to {@link System#out}. */
    default SELF printExplain(ExplainFormat format, ExplainDetail... extraDetails) {
        System.out.println(explain(format, extraDetails));
        return (SELF) this;
/** Explain format categorizes the output format of explain result. */
public enum ExplainFormat {
    /** Explain a {@link Explainable} with plain text format. */

/** ExplainDetail defines the types of details for explain result. */
public enum ExplainDetail {
     * The cost information on physical rel node estimated by optimizer. e.g. TableSourceScan(...,
     * cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}

     * The changelog mode produced by a physical rel node. e.g. GroupAggregate(...,
     * changelogMode=[I,UA,D])

    /** The execution plan in json format of the program. */

     * The potential risk warnings and SQL optimizer tuning advice analyzed from the physical plan.
public interface TableEnvironment {
     * Returns the AST of the specified statement and the execution plan to compute the result of
     * the given statement.
     * @param statement The statement for which the AST and execution plan will be returned.
     * @param extraDetails The extra explain details which the explain result should include, e.g.
     *     estimated cost, changelog mode for streaming, displaying execution plan in json format
     * @return AST and the execution plan.
    default String explainSql(String statement, ExplainDetail... extraDetails) {
        return explainSql(statement, ExplainFormat.PLAIN_TEXT, extraDetails);

     * Returns the AST of the specified statement and the execution plan to compute the result of
     * the given statement.
     * @param statement The statement for which the AST and execution plan will be returned.
     * @param format The output format of explain plan.
     * @param extraDetails The extra explain details which the explain result should include, e.g.
     *     estimated cost, changelog mode for streaming, displaying execution plan in json format
     * @return AST and the execution plan.
    String explainSql(String statement, ExplainFormat format, ExplainDetail... extraDetails);

Table Planner

Introduce an interface PlanAnalyzer to analyze the optimized rel nodes generated by Planner. It encloses an inner interface, i.e. AnalyzedResult, to wrap a PlanAdvice with a list of affected rel node IDs.

 * Plan analyzer analyzes the optimized physical plan and gives feedback on potential risk warnings
 * and optimization advice.
public interface PlanAnalyzer {
    /** Analyze the optimized {@link RelNode} and return {@link AnalyzedResult}. */
    Optional<AnalyzedResult> analyze(FlinkRelNode rel);

    /** The analyzed {@link PlanAdvice} with a list of applicable {@link RelNode} ids. */
    interface AnalyzedResult {

        PlanAdvice getAdvice();

        List<Integer> getTargetIds();

Introduce a POJO class PlanAdvice with the enum Kind and Scope to encapsulate the advice given by PlanAnalyzer.

package org.apache.flink.table.planner.analyze;

import org.apache.flink.annotation.Experimental;

/** Plain POJO for advice provided by {@link PlanAnalyzer}. */
public final class PlanAdvice {

    private final Kind kind;

    private final Scope scope;

    private String content;

    public PlanAdvice(Kind kind, Scope scope, String content) {
        this.kind = kind;
        this.scope = scope;
        this.content = content;

    public PlanAdvice(Kind kind, Scope scope) {
        this(kind, scope, "");

    public PlanAdvice withContent(String content) {
        this.content = content;
        return this;

    public String getContent() {
        return content;

    public Scope getScope() {
        return scope;

    public Kind getKind() {
        return kind;

    /** Categorize the semantics of a {@link PlanAdvice}. */
    public enum Kind {
        /** Indicate the potential risk. */
        /** Indicate the potential optimization. */

    /** Categorize the scope of a {@link PlanAdvice}. */
    public enum Scope {
         * Indicate a global advice, which is not specific to a {@link
         * org.apache.calcite.rel.RelNode}.
         * Indicate a local advice, which could be located to a specific {@link
         * org.apache.calcite.rel.RelNode}.

Proposed Changes

Overall Design

To leverage the planner's power to give more understandable, actionable advice closer to the user's perspective, we propose a new explain mode, which analyzes the optimized physical plan and attaches available tuning advice or data correctness warnings to the output.

The syntax exposed to users will be

User SQL Syntax

We also propose to categorize the output format by introducing the enum ExplainFormat, and with ExplainFormat#PLAIN_TEXT as default, which corresponds to the current output format.

The PlanAnalyzer will loop through the optimized physical plan (i.e. relNodes) to perform analysis. If the physical rel's pattern matches the analyzer, an analyzed result will be appended to the physical plan. There might be a sequence of advice provided by different analyzers targeting the same query. 

SCOPE and KIND are two orthogonal enums to categorize a PlanAdvice.

WARNINGIt reveals potential data correctness risks, such as state expiration, and NDU issues
ADVICEIt suggests potential performance tuning configuration, such as enabling mini-batch to optimize to two_phase aggregation
GLOBALIt provides advice from a global view. 
LOCALIt can link to the specific rel node, such as a table scan or a rank. It will be appended to the target rel node.

For SCOPE as LOCAL, the returned result will be similar to CHANGELOG_MODE and ESTIMATED_COSTS, which appends the info to the target rel node. For SCOPE AS GLOBAL,  "Physical Plan Advice" will be appended after "Optimized Physical Plan", followed by the global advice. 

Use Case


Suppose we implement an analyzer to collect all the rel nodes sensitive to state TTL. Then for the following query, the result will attach a warning on the join and aggregate node.

Query Example
set 'table.exec.state.ttl' = '36000';  

create table `order` (
  order_id bigint not null primary key not enforced,
  gmt_create timestamp(3) not null,
  buyer_id bigint not null,
  category_id bigint not null,
  amount double not null,
  ptime as proctime()
) with (
  'connector' = 'values',
  'bounded' = 'false'

create table `category` (
  category_id bigint not null primary key not enforced,
  category_name string not null
) with (
  'connector' = 'values',
  'bounded' = 'false'

explain plan_advice
  sum(a.amount) as revenue
  `order` a left join `category` for system_time as of `a`.`ptime` as `b`
on a.category_id = b.category_id
group by b.category_name;
Output Format
== Abstract Syntax Tree ==
LogicalAggregate(group=[{0}], revenue=[SUM($1)])
+- LogicalProject(category_name=[$7], amount=[$4])
   +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{3, 5}])
      :- LogicalProject(order_id=[$0], gmt_create=[$1], buyer_id=[$2], category_id=[$3], amount=[$4], ptime=[PROCTIME()])
      :  +- LogicalTableScan(table=[[default_catalog, default_database, order]])
      +- LogicalFilter(condition=[=($cor0.category_id, $0)])
         +- LogicalSnapshot(period=[$cor0.ptime])
            +- LogicalTableScan(table=[[default_catalog, default_database, category]])

== Optimized Physical Plan ==
GroupAggregate(groupBy=[category_name], select=[category_name, SUM(amount) AS revenue], warning=[You might want to pay attention to this node because state expiration configuration 'table.exec.state.ttl' is enabled.])
+- Exchange(distribution=[hash[category_name]])
   +- Calc(select=[category_name, amount])
      +- LookupJoin(table=[default_catalog.default_database.category], joinType=[LeftOuterJoin], lookup=[category_id=category_id], select=[category_id, amount, category_id0, category_name], warning=[You might want to pay attention to this node because state expiration configuration 'table.exec.state.ttl' is enabled.])
         +- TableSourceScan(table=[[default_catalog, default_database, order, project=[category_id, amount], metadata=[]]], fields=[category_id, amount])

== Optimized Execution Plan ==
GroupAggregate(groupBy=[category_name], select=[category_name, SUM(amount) AS revenue])
+- Exchange(distribution=[hash[category_name]])
   +- Calc(select=[category_name, amount])
      +- LookupJoin(table=[default_catalog.default_database.category], joinType=[LeftOuterJoin], lookup=[category_id=category_id], select=[category_id, amount, category_id0, category_name])
         +- TableSourceScan(table=[[default_catalog, default_database, order, project=[category_id, amount], metadata=[]]], fields=[category_id, amount])


Suppose we implement an analyzer to detect the NDU issues. Then for the following query, the result will attach a piece of global advice after the "Optimized Physical Plan".

Query Example
create temporary table cdc_with_meta (
 a int,
 b bigint,
 c string,
 d boolean,
 metadata_1 int metadata,
 metadata_2 string metadata,
 metadata_3 bigint metadata,
 primary key (a) not enforced
) with (
 'connector' = 'values',
 'changelog-mode' = 'I,UA,UB,D',
 'readable-metadata' = 'metadata_1:INT, metadata_2:STRING, metadata_3:BIGINT'

create temporary table sink_without_pk (
 a int,
 b bigint,
 c string
) with (
 'connector' = 'values',
 'sink-insert-only' = 'false'

explain plan_advice 
insert into sink_without_pk
select a, metadata_3, c
from cdc_with_meta
Output Format
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, metadata_3, c])
+- LogicalProject(a=[$0], metadata_3=[$6], c=[$2])
   +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta]])

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, metadata_3, c])
+- Calc(select=[a, metadata_3, c])
   +- TableSourceScan[3](table=[[default_catalog, default_database, cdc_with_meta, project=[a, c], metadata=[metadata_3]]], fields=[a, c, metadata_3])

== Physical Plan Advice ==
[ADVICE] The metadata column(s): 'metadata_3' in cdc source may cause wrong result or error on downstream operators, please consider removing these columns or use a non-cdc source that only has insert messages.
source node:
TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta, project=[a, c], metadata=[metadata_3]]], fields=[a, c, metadata_3], changelogMode=[I,UB,UA,D], upsertKeys=[[a]])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, metadata_3, c])
+- Calc(select=[a, metadata_3, c])
   +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta, project=[a, c], metadata=[metadata_3]]], fields=[a, c, metadata_3])

It would be an interactive experience for users to adjust the stream queries to be "correct" and "fast".


Here is an example implementation of PlanAnalyzer to detect all state TTL-sensitive rel nodes and inform users of the risk.

/** An implementation of {@link PlanAnalyzer} to collect state TTL sensitive rels. */ 
public class StateExpireRiskAnalyzer implements PlanAnalyzer {

    public static final StateExpireRiskAnalyzer INSTANCE = new StateExpireRiskAnalyzer();

    private static final PlanAdvice STATE_EXPIRE =
            new PlanAdvice(
                            "You might want to pay attention to this node because state expiration configuration '%s' is enabled.",

    private static final Set<String> STATE_TTL_INSENSITIVE_REL = new HashSet<>();

    static {
        // excludes the state TTL insensitive rel

    private StateExpireRiskAnalyzer() {}

    public Optional<AnalyzedResult> analyze(FlinkRelNode rel) {
        List<Integer> targetRelIds = new ArrayList<>();
        boolean enableStateTTL =
                        > 0;
        if (rel instanceof FlinkPhysicalRel && enableStateTTL) {
                    new RelShuttleImpl() {
                        public RelNode visit(RelNode other) {
                            if (!STATE_TTL_INSENSITIVE_REL.contains(
                                    other.getClass().getCanonicalName())) {
                            return super.visit(other);
            if (!targetRelIds.isEmpty()) {
                return Optional.of(
                        new AnalyzedResult() {
                            public PlanAdvice getAdvice() {
                                return STATE_EXPIRE;

                            public List<Integer> getTargetIds() {
                                return targetRelIds;
        return Optional.empty();

Open Questions

Q: What is the relationship between PLAN_ADVICE, CHANGELOG_MODE, and ESITMATED_COSTS? Can I explain all of those together? What will the result look like?

A: PLAN_ADVICE is another kind of explain detail towards CHANGELOG_MODE and ESTIMATED_COSTS. Users can choose to explain multiple details at the same time.

User SQL Syntax

Q: Why propose this feature in the community? It sounds more suitable for the platform side to implement.

A: User experience means a lot. Users will gain benefits if the community would like to provide a basic implementation. This feature also lays a foundation for custom analyzers to refer to the clarified API and behavior.

Future Work

In the future, there are mainly three follow-ups that can be achieved

  • Introduce ExplainFormat#JSON to output structured results, to facilitate the visualization of plans with tagged advice, which provides a more intuitive illustration for users to understand their queries.
  • Introduce more analyzers to perform analysis.
  • Extend the situation to cover batch mode.

Compatibility, Deprecation, and Migration Plan

It will not violate any compatibility, and no migration is needed.

Test Plan

We will test this FLIP in three ways:

  1. Make sure that the existing tests can pass.

  2. Test the explain functionality works as expected for different statements, like single query, statement set, and under sub plan reuse condition.

  3. Test the plan analyzer works as expected under different scenarios, like local/global v.s. advice/warning.

  4. Add new ITCase and E2E tests.

Rejected Alternatives

Rejected User Syntax

On first thought, we want to align the syntax with EXPLAIN JSON_EXECUTION_PLAN. However, it is unclear to know which plan needs to be analyzed. This brings out the second question, which plan should be analyzed? 

Analyze JSON Execution Plan v.s. Optimized Physical Plan

Q: Why not enhance ExplainDetail#JSON_EXECUTION_PLAN, but choose the target to be optimized rel?

A: Currently, we have three ExplainDetail: CHANGLOG_MODE, ESTIMATED_COSTS and JSON_EXECUTION_PLAN.

The relationship between them and the corresponding SQL-processing phase is illustrated below. Stream Graph is first excluded because we want to limit this analysis within the compile time. And from this Fig, we can tell that optimized rel is the closest to the original query.

