Status
Current state: "Under Discussion" Accepted
Discussion thread: https://lists.apache.org/thread/5xywxv7g43byoh0jbx1b6qo6gx6wjkcz
Vote thread: https://lists.apache.org/thread/bsgqvvs9wx1dkv7p3m9ctockh84rl11j
JIRA:
JIRA: TBD Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-30650
Released: <Flink Version>1.17.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Table of Contents |
---|
Motivation
Although SQL has been widely adopted among Flink users in recent years, we noticed that it is still not easy for users to write the "correct" and "fast" stream queries to achieve what they want, and the general familiarity with the concept of the dynamic table and the retraction might not be enough to troubleshoot the specific cases, even for experienced users. We might have posted detailed documentation, such as data correctness issues caused by NDU and performance tuning. Nevertheless, users must match the issue pattern with the documented item to have a fix/workaround. For users, this process seems like a repetitive pull-based guess-and-try. There is a considerable gap between the documented concepts/optimization techniques and the actual, applicable actions.
...
- Users can get instant advice when explaining the query with the proposed syntax EXPLAIN PLAN_ADVICE.
- This FLIP lays the groundwork for further improvement. For example, the community might implement more analyzers or provide the visualization of the physical plan with advice (by Flink Plan Visualizer) in the future.
Public Interfaces
We propose to add/change the following interfaces/classes.
Change List | Module | Annotation |
---|---|---|
Explainable | flink-table-api-java | PublicEvolving |
ExplainFormat | ||
ExplainDetail | ||
TableEnvironment | ||
PlanAnalyzer | flink-table-planner | Internal |
PlanAdvice |
Table API
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/** * Represents an artifact that can be explained using a summary string. * * @see #explain(ExplainDetail...) */ @PublicEvolving public interface Explainable<SELF extends Explainable<SELF>> { /** * Returns the AST of this object and the execution plan to compute the result of the given * statement. * * @param extraDetails The extra explain details which the result of this method should include, * e.g. estimated cost, changelog mode for streaming * @return AST and the execution plan. */ default String explain(ExplainDetail... extraDetails) { return explain(ExplainFormat.TEXT, extraDetails); } /** * Returns the AST of this object and the execution plan to compute the result of the given * statement. * * @param format The output format of explain plan * @param extraDetails The extra explain details which the result of this method should include, * e.g. estimated cost, changelog mode for streaming * @return AST and the execution plan. */ String explain(ExplainFormat format, ExplainDetail... extraDetails); /** Like {@link #explain(ExplainDetail...)}, but piping the result to {@link System#out}. */ @SuppressWarnings("unchecked") default SELF printExplain(ExplainDetail... extraDetails) { return printExplain(ExplainFormat.TEXT, extraDetails); } /** Like {@link #explain(ExplainDetail...)}, but piping the result to {@link System#out}. */ @SuppressWarnings("unchecked") default SELF printExplain(ExplainFormat format, ExplainDetail... extraDetails) { System.out.println(explain(format, extraDetails)); return (SELF) this; } } |
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
@PublicEvolving public interface TableEnvironment { /** * Returns the AST of the specified statement and the execution plan to compute the result of * the given statement. * * @param statement The statement for which the AST and execution plan will be returned. * @param extraDetails The extra explain details which the explain result should include, e.g. * estimated cost, changelog mode for streaming, displaying execution plan in json format * @return AST and the execution plan. */ default String explainSql(String statement, ExplainDetail... extraDetails) { return explainSql(statement, ExplainFormat.TEXT, extraDetails); } /** * Returns the AST of the specified statement and the execution plan to compute the result of * the given statement. * * @param statement The statement for which the AST and execution plan will be returned. * @param format The output format of explain plan. * @param extraDetails The extra explain details which the explain result should include, e.g. * estimated cost, changelog mode for streaming, displaying execution plan in json format * @return AST and the execution plan. */ String explainSql(String statement, ExplainFormat format, ExplainDetail... extraDetails); } |
Table Planner
Introduce an interface PlanAnalyzer to analyze the optimized rel nodes generated by Planner. It encloses an inner interface, i.e., AnalyzedResult, to wrap a PlanAdvice with a list of affected rel node IDs.
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package org.apache.flink.table.planner.analyze; import org.apache.flink.annotation.Experimental; /** Plain POJO for advice provided by {@link PlanAnalyzer}. */ @Internal public final class PlanAdvice { private final Kind kind; private final Scope scope; private String content; public PlanAdvice(Kind kind, Scope scope, String content) { this.kind = kind; this.scope = scope; this.content = content; } public PlanAdvice(Kind kind, Scope scope) { this(kind, scope, ""); } public PlanAdvice withContent(String content) { this.content = content; return this; } // getters ... /** Categorize the semantics of a {@link PlanAdvice}. */ @Internal public enum Kind { /** Indicate the potential risk. */ WARNING, /** Indicate the potential optimization. */ ADVICE } /** Categorize the scope of a {@link PlanAdvice}. */ @Internal public enum Scope { /** * Indicate the advice is not specific to a {@link * org.apache.calcite.rel.RelNode}. */ QUERY_LEVEL, /** * Indicate the advice is specific to a {@link * org.apache.calcite.rel.RelNode}. */ NODE_LEVEL } } |
Proposed Changes
Overall Design
To leverage the planner's power to give more understandable, actionable advice closer to the user's perspective, we propose a new explain mode PLAN_ADVICE, which analyzes the optimized physical plan and attaches available tuning advice or data correctness warnings to the output.
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
== Abstract Syntax Tree == LogicalUnion(all=[true]) :- LogicalProject(count=[$0], word=[$1]) : +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) +- LogicalProject(count=[$0], word=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) == Optimized Physical Plan With Advice == Union(all=[true], union=[count, word]) :- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')]) : +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word]) advice: No available advice. == Optimized Execution Plan == Union(all=[true], union=[count, word]) :- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')]) : +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word]) |
Use Case
Case-1
SCOPE = NODE_LEVEL and KIND = WARNING
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
== Abstract Syntax Tree == LogicalAggregate(group=[{0}], revenue=[SUM($1)]) +- LogicalProject(category_name=[$7], amount=[$4]) +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{3, 5}]) :- LogicalProject(order_id=[$0], gmt_create=[$1], buyer_id=[$2], category_id=[$3], amount=[$4], ptime=[PROCTIME()]) : +- LogicalTableScan(table=[[default_catalog, default_database, order]]) +- LogicalFilter(condition=[=($cor0.category_id, $0)]) +- LogicalSnapshot(period=[$cor0.ptime]) +- LogicalTableScan(table=[[default_catalog, default_database, category]]) == Optimized Physical Plan With Advice == GroupAggregate(advice=[1], groupBy=[category_name], select=[category_name, SUM(amount) AS revenue]) +- Exchange(distribution=[hash[category_name]]) +- Calc(select=[category_name, amount]) +- LookupJoin(advice=[1], table=[default_catalog.default_database.category], joinType=[LeftOuterJoin], lookup=[category_id=category_id], select=[category_id, amount, category_id0, category_name]) +- TableSourceScan(table=[[default_catalog, default_database, order, project=[category_id, amount], metadata=[]]], fields=[category_id, amount]) advice[1]: You might want to pay attention to this node because state expiration configuration 'table.exec.state.ttl' is enabled. == Optimized Execution Plan == GroupAggregate(groupBy=[category_name], select=[category_name, SUM(amount) AS revenue]) +- Exchange(distribution=[hash[category_name]]) +- Calc(select=[category_name, amount]) +- LookupJoin(table=[default_catalog.default_database.category], joinType=[LeftOuterJoin], lookup=[category_id=category_id], select=[category_id, amount, category_id0, category_name]) +- TableSourceScan(table=[[default_catalog, default_database, order, project=[category_id, amount], metadata=[]]], fields=[category_id, amount]) |
Case-2
SCOPE = QUERY_LEVEL and KIND = WARNING
...
It would be an interactive experience for users to adjust the stream queries to be "correct" and "fast".
PlanAnalzyer
PlanAnalyzer is an internal interface to perform the analysis and provide advice. Introducing a new PlanAnalyzer is similar to adding a new RelOptRule to RuleSet.
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/** An implementation of {@link PlanAnalyzer} to collect state TTL sensitive rels. */ @Experimental public class StateExpireRiskAnalyzer implements PlanAnalyzer { public static final StateExpireRiskAnalyzer INSTANCE = new StateExpireRiskAnalyzer(); private static final PlanAdvice STATE_EXPIRE = new PlanAdvice( PlanAdvice.Kind.WARNING, PlanAdvice.Scope.NODE_LEVEL, String.format( "You might want to pay attention to this node because state expiration configuration '%s' is enabled.", ExecutionConfigOptions.IDLE_STATE_RETENTION.key())); private static final Set<String> STATE_TTL_INSENSITIVE_REL = new HashSet<>(); static { // excludes the state TTL insensitive rel } private StateExpireRiskAnalyzer() {} @Override public Optional<AnalyzedResult> analyze(FlinkRelNode rel) { List<Integer> targetRelIds = new ArrayList<>(); boolean enableStateTTL = ShortcutUtils.unwrapTableConfig(rel) .get(ExecutionConfigOptions.IDLE_STATE_RETENTION) .toMillis() > 0; if (rel instanceof FlinkPhysicalRel && enableStateTTL) { rel.accept( new RelShuttleImpl() { @Override public RelNode visit(RelNode other) { if (!STATE_TTL_INSENSITIVE_REL.contains( other.getClass().getCanonicalName())) { targetRelIds.add(other.getId()); } return super.visit(other); } }); if (!targetRelIds.isEmpty()) { return Optional.of( new AnalyzedResult() { @Override public PlanAdvice getAdvice() { return STATE_EXPIRE; } @Override public List<Integer> getTargetIds() { return targetRelIds; } }); } } return Optional.empty(); } } |
ExplainFormat
The problem with the current output format is that it is a mixture of plain text (AST, Optimized Physical Plan, and Optimized Execution Plan) and JSON (Physical Execution Plan, via EXPLAIN JSON_EXECUTION_PLAN ), which is not structured and categorized.
...
In the future, we might introduce more formats along with the syntax EXPLAIN [ExplainDetails...] WITH FORMAT [TEXT | JSON] FORMAT <query>. While this <query> (similar to MySQL Explain)
Note: this is just an illustration. The exact syntax is beyond the scope of this FLIP.
Open Questions
Q: What is the relationship between PLAN_ADVICE, CHANGELOG_MODE, and ESITMATED_COSTSCOST? Can I explain all of those together? What will the result look like?
A: PLAN_ADVICE is another kind of explain detail towards CHANGELOG_MODE and ESTIMATED_COSTSCOST. Users can choose to explain multiple details at the same time.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
EXPLAIN PLAN_ADVICE, CHANGELOG_MODE, ESTIMATED_COSTSCOST <query> |
Q: Why propose this feature in the community? It sounds more suitable for the platform side to implement.
A: User experience means a lot. Users will gain benefits if the community would like to provide a basic implementation. This feature also lays a foundation for custom analyzers to refer to the clarified API and behavior.
Future Work
In the future, there are mainly three follow-ups that can be achieved
- Introduce ExplainFormat#JSON to output structured results, to facilitate the visualization of plans with tagged advice, which provides a more intuitive illustration for users to understand their queries.
- Introduce more analyzers to perform analysis.
- Extend the situation to cover batch mode.
Compatibility, Deprecation, and Migration Plan
It will not violate any compatibility, and no migration is needed.
Test Plan
We will test this FLIP in three ways:
Make sure that the existing tests can pass.
Test the explain functionality works as expected for different statements, like single query, statement set, and under sub plan reuse condition.
Test the plan analyzer works as expected under different scenarios, like local/global v.s. advice/warning.
Add new ITCase and E2E tests.
Rejected Alternatives
Rejected User Syntax
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
EXPLAIN JSON_ANALYZED_PLAN <query> |
On first thought, we want to align the syntax with EXPLAIN JSON_EXECUTION_PLAN. However, it is unclear to know which plan needs to be analyzed. This brings out the second question, which plan should be investigated?
Analyze JSON Execution Plan v.s. Optimized Physical Plan
Q: Why not enhance ExplainDetail#JSON_EXECUTION_PLAN, but choose the target to be optimized rel?
...
The relationship between them and the corresponding SQL-processing phase is illustrated below. Stream Graph is first excluded because we want to limit this analysis within the compile time. And from this Fig, we can tell that optimized rel is the closest to the original query.