Status
Current state: AcceptedUnder Discussion
Discussion thread:
here (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-84-Improve-amp-Refactor-API-of-Table-Module-td34537.html)
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-84-Feedback-Summary-td39261.html
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In Flink 1.9, TableEnvironment introduces `void execute(String jobName)` interface to trigger the Flink table program execution, and extends `void sqlUpdate(String sql)` interface to evaluates not only a INSERT statement but also a DDL statement and a USE statement. But with more use cases coming up, there are some fatal shortcomings in current API design.
...
- Users are confused by what kinds of sql are executed at once and what are buffered and what kinds of sql are buffered until triggered by the execute method.
- Buffering SQLs/Tables will cause behavior undefined. We may want to insert data into the `test` table with the `/tmp1` path but get the wrong result of `/tmp2`.
Public Interfaces
executeSqlWe propose to deprecate the following methods:
TableEnvironment.sqlUpdate(String)
- TableEnvironment.insertInto(String, Table)
- TableEnvironment.execute(String)
- TableEnvironment.explain(boolean)
- TableEnvironment.fromTableSource(TableSource<?>)
- Table.insertInto(String)
meanwhile, we propose to introduce the following new methods:
Code Block language java title New methods in TableEnvironment interface TableEnvironment { // execute the given single statement, and return the execution result. TableResult executeSql(String statement) throw Exception; // get the AST and the execution plan for the given single statement (DQL, DML) String explainSql(String statement, ExplainDetail... extraDetails); // create a StatementSet instance which can add DML statements or Tables // to the set and explain or execute them as a batch. StatementSet createStatementSet(); }
Code Block language java title New methods in Table interface Table { // write the Table to a TableSink that was registered // under the specified path. TableResult executeInsert(String tablePath); // write the Table to a TableSink that was registered // under the specified path. TableResult executeInsert(String tablePath, boolean overwrite); // create a StatementSet instance which can add DML statements or Tables // to the set and explain or execute them as a batch. String explain(ExplainDetail... extraDetails); // get the contents of the current table. TableResult execute() throw Exception; }
Code Block language java title New class: TableResult interface TableResult { // return JobClient if a Flink job is submitted // (e.g. for DML statement), else return empty (e.g. for DDL). Optional<JobClient> getJobClient(); // return the schema of the result TableSchema getTableSchema(); // return the ResultKind which can avoid custom parsing of // an "OK" row in programming ResultKind getResultKind(); // get the row contents as an iterable rows Iterable<Row> collect(); // print the result contents void print(); }
Code Block language java title New class: ResultKind public enum ResultKind { // for DDL, DCL and statements with a simple "OK" SUCCESS, // rows with important content are available (DML, DQL) SUCCESS_WITH_CONTENT }
Code Block language java title New class: StatementSet interface StatementSet { // add single INSERT statement into the set StatementSet addInsertSql(String statement); // add Table with the given sink table name to the set StatementSet addInsert(String targetPath, Table table); // add Table with the given sink table name to the set StatementSet addInsert(String targetPath, Table table, boolean overwrite); // returns the AST and the execution plan to compute // the result of all statements and Tables String explain(ExplainDetail... extraDetails); // execute all statements and Tables as a batch TableResult execute() throws Exception; }
Code Block language java title New class: ExplainDetail public enum ExplainDetail { STATE_SIZE_ESTIMATE, UID, HINTS, ... }
- For current messy Flink table program trigger point, we propose that: for TableEnvironment and StreamTableEnvironment, you must use `TableEnvironment.execute()` to trigger table program execution, once you convert the table program to a DataStream program (through `toAppendStream` or `toRetractStream` method), you must use `StreamExecutionEnvironment.execute` to trigger the DataStream program.
Similar rule for BatchTableEnvironment, you must use `TableEnvironment.execute()` to trigger batch table program execution, once you convert the table program (through `toDataSet` method) to a DataSet program, you must use `ExecutionEnvironment.execute` to trigger the DataSet program.
Proposed Changes
`TableEnvironment.sqlUpdate(String)`
Now `void sqlUpdate(String sql)` method will execute DDLs right now, while DMLs will be buffered and be triggered by `TableEnvironment.execute()`. Both behaviors should be kept consistent. So this method will be deprecated. We propose a new blocking method with execution result:
...
Statement | Result Schema | Result Value | Result Kind | Examples | |
DDL | field name: result field type: VARCHAR(2) | "OK" (single row) | SUCCESS | CREATE TABLE new_table (col1 BIGINT, ...) | |
DML (INSERT/UPDATE/DELETE) | field name: affected_rowcount field type: BIGINT | the affected row count (-1 means unknown) | SUCCESS_WITH_CONTENT | INSERT INTO sink_table SELECT … | |
SHOW xx | field name: result field type: VARCHAR(n) (n is the max length of values) | list all objects (multiple rows) | SUCCESS_WITH_CONTENT | SHOW CATALOGS | |
DESCRIBE xx | describe the detail of an object (single row) | DESCRIBE CATALOG catalog_name | |||
EXPLAIN xx | explain the plan of a query (single row) | EXPLAIN PLAN FOR SELECT … | |||
USE xx | field name: result field type: VARCHAR(2) | "OK" (single row) | SUCCESS | USE CATALOG catalog_name |
`TableEnvironment.insertInto(String, Table)` & `Table.insertInto(String)`
Like the `sqlUpdate` method, `TableEnvironment.insertInto(String, Table)` and `Table.insertInto(String)` also buffter the Tables, and will cause the buffer problem. So these two methods will be deprecated.
`TableEnvironment.execute(String)` & `TableEnvironment.explain(boolean)`
Since we will disable buffering SQLs/Tables and plans, it’s meaningless to provide `execute(String)` as the trigger entry point and explain(boolean) method should also not be used anymore. So we advise deprecating those two methods. Instead, we introduce a new method named `createStatementSet` and a new class named `StatementSet` to support multiple SQLs/Tables optimization. Only DML statements or Tables can be added to StatementSet. For DML, only `INSERT` is supported now, DELETE and UPDATE can also be supported in the future.
...
column1 (insert into xx ... ) | column2 (stmtSet.addInsert("yy", tEnv.sqlQuery("select ..."))) | |
Schema | name: affected_rowcount_0 type: BIGINT | name: affected_rowcount_1 type: BIGINT |
Data (single row) | -1 | -1 |
`TableEnvironment.fromTableSource(TableSource<?>)`
Since Flip-64 has provided `ConnectTableDescriptor#createTemporaryTable` to register TableSource in TableEnvironment. This method should be deprecated too, it’s an omission in that flip.
Other new proposed methods
Currently, we can’t explain a statement directly in TableEnvironment, we must convert a statement to a Table through `TableEnvironment.sqlQuery` method. Meanwhile, we can’t explain a INSERT statement, because we can’t convert an INSERT statement to a Table. We introduce `TableEnvironment.explainSql()` method to support explaining DQL and DML statements directly. The `explainSql` method only accepts single statement.
...
Code Block | ||||
---|---|---|---|---|
| ||||
interface Table { /** * Write the Table to a TableSink that was registered * under the specified path. * * @param tablePath The path of the registered TableSink to which * the Table is written. */ TableResult executeInsert(String tablePath); /** * Write the Table to a TableSink that was registered * under the specified path. * * @param tablePath The path of the registered TableSink to which * the Table is written. * @param overwrite Whether overwrite the existing data */ TableResult executeInsert(String tablePath, boolean overwrite); /** * Returns the AST and the execution plan to compute the result of * the current Table. * * @param extraDetails the extra details which the plan should contain. * e.g. estimated cost, uid */ String explain(ExplainDetail... extraDetails); /** * Get the contents of the current table. */ TableResult execute() throw Exception; } |
How to correct the execution behavior?
First, let’s discuss the buffer problem in depth. Actually there are two levels of buffer, TableEnvironment will buffer SQLs/Tables and StreamExecutionEnvironment will buffer transformations to generate StreamGraph. Each TableEnvironment instance holds a StreamExecutionEnvironment instance. Currently, when translating a FlinkRelNode into a Flink operator, the generated transformations will be added to StreamExecutionEnvironment’s buffer. The bug[2] is caused by this behavior. Let’s give another simple example to explain the problem of StreamExecutionEnvironment’s buffer.
...
After we correct the behavior of the `execute` method, users can easily and correctly write the table program even if the deprecated methods, the new methods and the `to DataStream` methods are mixed used.
Examples:
We will list some examples using old API and proposed API to have a straightforward comparison in this section.
`sqlUpdate` vs `executeSql`:
Current Interface | New Interface |
tEnv.sqlUpdate("CREATE TABLE test (...) with (path = '/tmp1')"); | TableResult result = tEnv.executeSql("CREATE TABLE test (...) with (path = '/tmp1')"); result... |
tEnv.sqlUpdate("INSERT INTO test SELECT ..."); tEnv.execute("test"); | TableResult result = tEnv.executeSql("INSERT INTO test SELECT ..."); JobClient jobClient = result.getJobClient().get(); jobClient... result.print(); |
`execute & explain` & vs `createStatementSet`:
Current Interface | New Interface |
tEnv.sqlUpdate("insert into xx ...") tEnv.sqlUpdate("insert into yy ...") tEnv.execute("test") // tEnv.explain(false) | StatementSet stmtSet = tEnv.createStatementSet(); stmtSet.addInsertSql("insert into xx ..."); stmtSet.addInsertSql("insert into yy ..."); TableResult result = stmtSet.execute(); // stmtSet.explain() |
Table table1 = tEnv.sqlQuery("select xx ...")... Table table2 = tEnv.sqlQuery("select yy ...")... tEnv.insertInto("sink1", table1) tEnv.insertInto("sink2", table2) tEnv.execute("test") // tEnv.explain(false) | Table table1 = tEnv.sqlQuery("select xx ...")... Table table2 = tEnv.sqlQuery("select yy ...")... StatementSet stmtSet = tEnv.createStatementSet(); stmtSet.addInsert("sink1", table1); stmtSet.addInsert("sink2", table2); TableResult result = stmtSet.execute() // stmtSet.explain() |
Other new proposed methods
Code Block | ||||
---|---|---|---|---|
| ||||
TableEnvironment tEnv = ... tEnv.explainSql("insert into s1 ...") tEnv.explainSql("select xx ...") Table table1 = tEnv.sqlQuery("select xx ...")... String explanation = table1.explain(); TableResult result1 = table1.executeInsert("sink1"); Table table2 = tEnv.sqlQuery("select yy ...")... TableResult result2 = table2.execute(); result2.print(); |
Deprecated methods and new methods work together
Code Block | ||||
---|---|---|---|---|
| ||||
TableEnvironment tEnv = ...
StatementSet stmtSet = tEnv.createStatementSet();
tEnv.sqlUpdate("insert into s1 ..."); // statement1
stmtSet.addInsertSql("insert into s2 ..."); // statement2
tEnv.insertInto("sink1", tEnv.sqlQuery("select xx...")); // statement3
tEnv.executeSql("insert into s3 ..."); // only submit the plan of this statement
tEnv.explain(false); // explain the plan of statement1 and statement3
tEnv.execute( "test1"); // submit the plan of statement1 and statement3
stmtSet.addInsert("sink2", tEnv.sqlQuery("select yy...")); // statement4
stmtSet.explain(); // explain the plan of statement2 and statement4
TableResult result = stmtSet.execute(); // submit the plan of statement2 and statement4 |
TableEnvironment’s methods and DataStream work together
Code Block | ||||
---|---|---|---|---|
| ||||
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.sqlUpdate("insert into s1 ..."); // statement1 StatementSet stmtSet = tEnv.createStatement(); stmtSet.addInsert("sink1", tEnv.sqlQuery("select xx...")); // statement2 Table table = tEnv.sqlQuery("select yy..."); DataStream dataStream = tEnv.toAppendStream(table, Row.class); // statement3 dataStream… tEnv.explain(false); // explain the plan of statemen1 stmtSet.explain(); // explain the plan of statemen2 env.execute("test1") ; // submit the plan of statement3 tEnv.execute("test2") ; // submit the plan of statement1 stmtSet.execute(); // submit the plan of statement2 |
Summary:
Methods in TableEnvironment & Table
Methods | Comments | |
TableEnvironment | JobExecutionResult execute(String jobName) | deprecated |
String explain(boolean extended) | deprecated | |
void sqlUpdate(String sql) | deprecated | |
void insertInto(String, Table) | deprecated | |
Table fromTableSource(TableSource tableSource) | deprecated | |
TableResult executeSql(String statement) | added | |
String explainSql(String, ExplainDetail... extraDetails) | added | |
StatementSet createStatementSet() | added | |
Table | insertInto(String tablePath) | deprecated |
TableResult executeInsert(String tablePath) | added | |
TableResult executeInsert(String tablePath, boolean overwrite) | added | |
String explain(ExplainDetail... extraDetails) | added | |
TableResult execute() | added |
New methods for single statement & multiple statements
single statement | multiple statements | |
DDL | executeSql(String) | Unsupported (supports multiple DDLs for easy testing in the future) |
SHOW/DESCRIBE/USE | executeSql(String) | Unsupported |
DML | executeSql(String) | createStatementSet() -> StatementSet -> execute() |
EXPLAIN | explain(Table) explainSql(String) | createStatementSet() -> StatementSet -> explain() |
Compatibility, Deprecation, and Migration Plan
- Methods of TableEnvironment to be deprecated:
- void sqlUpdate(String sql)
- void insertInto(String targetPath, Table table)
- JobExecutionResult execute(String jobName)
- String explain(boolean extended)
- Table fromTableSource(TableSource tableSource)
- Methods in Table to be deprecated:
- void insertInto(String targetPath)
- You need to change to your program a little if you use `StreamExecutionEnvironment/ExecutionEnvironment.execute` to trigger a table program execution or use `StreamTableEnvironment.execute()` to trigger a DataStream program execution, use `BatchTableEnvironment.execute()` to trigger a DataSet program execution.
Test Plan
The `SatementSet#explain` method can be tested with unit tests, and other new methods can be tested with integration tests. We will also add some integration tests to verify the new methods can work with the deprecated methods correctly.
Rejected Alternatives
TableEnvironment#executeBatch(String... statement)
This method is consistent with the style of other methods in TableEnvironment, however It does not support Table API and can not explain the plan.
References
[1] FLIP-69 Flink SQL DDL Enhancement
...
[9] Sqline deal with batch execute
Appendix - Future Plan: not the scope of this flip.
SQL CLI integrates with new API
How SQL CLI leverage the StatementSet class to obtain optimization?
We can reference other system design like Sqlline Batch Command[9] and introduce similarly command but we should notice that the sql in batch can only be `insert into`.How SQL CLI parse and execute multiple statements?
We don’t want to support multiple statements parsing in the TableEnvironment but this feature is needed in the SQL CLI for it’s natural to execute an external script. I have thought provided a parse method like `List<String> parse(String stmt)`, but it’s not intuitive to understand and this method shouldn’t belong to the TableEnvironment API. As the discussion in the pull-request [5][6], calcite has provided the `SqlNodeList parseSqlStmtList()` method to parse a list of SQL statements separated by a semicolon and constructs a parse tree. I think the SQL CLI can use this method to parse multiple statements and execute every single statement one by one through TableEnvironmet#executeSql(String statement). Here is one thing we should take care of is that there are some special commands like `help/set/quit` in SQL CLI to control the environment’s lifecycle and change the variables of the context. IMO, there are some ways to deal with these commands in the multiple statements:- Support these special control commands in flink-sql-parser and the shortcoming will be that TableEnvironment should take care of those noisy commands and flink-sql-parser will lose it’s more widely expansibility to other external systems. For example, SQL CLI may need to support `source xx` that execute an external script, it’s not proper to make TableEnvironment parser to see such syntax.
- pro’s:
- unified parser
- can handle corner case, e.g. https://github.com/apache/flink/pull/8738
- con’s:
- many commands are only used for sql-client, e.g. help, quit, source
- how to meet the requirements of non-builtin commands, e.g. commands from flink-sql-gateway
- not easy to extend, it’s more difficult to implement a client-specific command in sql-parser than in specific client
- pro’s:
- SQL CLI parses those control commands on its own and should pre-split the multiple statements according to the control command. Then SQL CLI can pass the part of multiple statements to SqlParser and obtain a SqlNodeList.
- pro’s:
- sql-parser is more clean
- more easy to extend for sql-client
- con’s:
- many parsers: SqlCommandParser(in sql client),sql-parser
- may meet the corner case, e.g. https://github.com/apache/flink/pull/8738
- pro’s:
- Flink already introduces a `Parser` interface which is exposed by `Planner`. We can add one more method to `Parser` like: List<String> splitStatement(String) and then we can borrow calcite to achieve this functionality. Special client commands (e.g. help, quit, source) are not supported in sql-parser now. Because the SqlParser#parseStmtList return SqlNodeList, not a string list, those special commands are not defined in SqlNode. So I think this approach is only a complement to the first one.
- Support a utility class to parse a statement separated by semicolon into multiple statements.
- pro’s:
- more easy to extend for sql-client
- can handle corner case in a unified place
- con’s:
- many parsers: sql-parser, a utility parser
- many parsers: sql-parser, a utility parser
- pro’s:
- -
I think option d is better. Looking forward to more people's opinions. (we can open an another flip to discuss this)
- Support these special control commands in flink-sql-parser and the shortcoming will be that TableEnvironment should take care of those noisy commands and flink-sql-parser will lose it’s more widely expansibility to other external systems. For example, SQL CLI may need to support `source xx` that execute an external script, it’s not proper to make TableEnvironment parser to see such syntax.
- Other open question?
...