...
Let’s give an example to explain the buffering SQLs/Tables execution problem:
tEnv.sqlUpdate("CREATE TABLE test (...) with (path = '/tmp1')"); |
- Users are confused by what kinds of sql are executed at once and what are buffered and what kinds of sql are buffered until triggered by the execute method.
- Buffering SQLs/Tables will cause behavior undefined. We may want to insert data into the `test` table with the `/tmp1` path but get the wrong result of `/tmp2`.
...
Now `void sqlUpdate(String sql)` method will execute DDLs right now while DMLs must be triggered by `TableEnvironment.execute()`. Both behaviors should be kept consistent. This method will buffer the `INSERT` statement which causes the above problem. So this method will be deprecated. We propose a new blocking method with execution result:
/** * If the statement is translated to a Flink job, the result will be returned until the job is finished. * * @return result for SHOW/DESCRIBE/EXPLAIN, the affected row count for `DML` (-1 means unknown), or a string message ("OK") for other statements. * @throws Exception which occurs during the execution. |
This method only supports executing a single statement which can be DDL, DML, SHOW, DESCRIBE, EXPLAIN and USE statement. This method will be executed synchronously and return a ResultTable which is the representation of the execution result, and contains the result data and the result schema. If an error occurs, this method will throw an exception.
/** * A ResultTable is the representation of the statement execution result. */
* Get the schema of ResultTable. */ * Get the result contents as an iterable rows. */ |
The following table describes the result for each kind of statement:
Statement | Result Scheam | Result Value | Examples | |
DDL | field name: result field type: VARCHAR(2) | "OK" (single row) | CREATE TABLE new_table (col1 BIGINT, ...) | |
DML (INSERT/UPDATE/DELETE) | field name: affected_rowcount field type: BIGINT | the affected row count (-1 means unknown) | INSERT INTO sink_table SELECT … | |
SHOW xx | field name: result field type: VARCHAR(n) (n is the max length of values) | list all objects (multiple rows) | SHOW CATALOGS | |
DESCRIBE xx | describe the detail of an object (single row) | DESCRIBE CATALOG catalog_name | ||
EXPLAIN xx | explain the plan of a query (single row) | EXPLAIN PLAN FOR SELECT … | ||
USE xx | field name: result field type: VARCHAR(2) | "OK" (single row) | USE CATALOG catalog_name |
`insertInto(String, Table)`
...
`DmlBatch` supports adding a list of SQLs and Tables through the `addXX` methods, getting the plan of all statements through the `explain` method, optimizing the whole statements and submitting the job through the `execute` method. The added statements and Tables will be cleared when calling the `execute` method.
interface TableEnvironment { /** * Create a DmlBatch instance which can add dml statements or Tables to the batch, the planner can optimize all added statements and Tables together for better performance. */ DmlBatch createDmlBatch(); } |
interface DmlBatch { /** * add insert statement to the batch. */ void addInsert(String insert); /** * add Table with the given sink table name to the batch. */ void addInsert(String targetPath, Table table);
/** * execute all statements and Tables as a batch. * * The added statements and Tables will be cleared when executing * this method. */ ResultTable execute() throws Exception;
/** * returns the AST and the execution plan to compute the result of the * all statements and Tables. * * @param extended if the plan should contain additional properties. * e.g. estimated cost, traits */ String explain(boolean extended); } |
Each statement or Table has a return value which is the affected row count of a statement or a Table. So the ResultTable has multiple columns. All column types are BIGINT, and the column name is "affected_rowcount_" plus the index of the statement or Table. e.g.
DmlBatch batch = tEnv.createDmlBatch(); batch.addInsert("insert into xx ..."); batch.addInsert("yy", tEnv.sqlQuery("select ...")); batch.execute("test") |
The schema and data in ResultTable:
column1 (insert into xx ... ) | column2 (batch.addInsert("yy", tEnv.sqlQuery("select ..."))) | |
Schema | name: affected_rowcount_0 type: BIGINT | name: affected_rowcount_1 type: BIGINT |
Data (single row) | -1 | -1 |
`Table fromTableSource(TableSource<?> source)`
...
First, let’s discuss the buffer problem in depth. Actually there are two levels of buffer, TableEnvironment will buffer SQLs/Tables and StreamExecutionEnvironment will buffer transformations to generate StreamGraph. Each TableEnvironment instance holds a StreamExecutionEnvironment instance. Currently, when translating a FlinkRelNode into a Flink operator, the generated transformations will be added to StreamExecutionEnvironment’s buffer. The bug[2] is caused by this behavior. Let’s give another simple example to explain the problem of StreamExecutionEnvironment’s buffer.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // will add transformations to env when translating to execution plan tEnv.sqlUpdate("INSERT INTO sink1 SELECT a, b FROM MyTable1") Table table = tEnv.sqlQuery("SELECT c, d from MyTable2") DataStream dataStream = tEnv.toAppendStream(table, Row.class) dataStream… env.execute("job name") ; // or tEnv.execute("job name") |
The job submitted by each execute method contains the topology of both queries. Users are confused about the behavior. As suggested in "Public Interfaces",`StreamExecutionEnvironment.execute` only triggers DataStream program execution, and `TableEnvironment.execute` only triggers table program execution. So the expected behavior for the above example is `env.execute("job name")` submits the second query, and `tEnv.execute("job name") ` submits the first query.
...
`sqlUpdate` vs `executeStatement`:
Current Interface | New Interface |
tEnv.sqlUpdate("CREATE TABLE test (...) with (path = '/tmp1')"); | ResultTable result = tEnv.executeStatement("CREATE TABLE test (...) with (path = '/tmp1')"); result... |
tEnv.sqlUpdate("INSERT INTO test SELECT ..."); tEnv.execute("test"); | ResultTable result = tEnv.executeStatement("INSERT INTO test SELECT ..."); result... |
`execute & explain` & vs `createDmlBatch`:
Current Interface | New Interface |
tEnv.sqlUpdate("insert into xx ...") tEnv.sqlUpdate("insert into yy ...") tEnv.execute("test") // tEnv.explain(false) | DmlBatch batch = tEnv.createDmlBatch(); batch.addInsert("insert into xx ..."); batch.addInsert("insert into yy ..."); ResultTable result = batch.execute(); // batch.explain(false) |
Table table1 = tEnv.sqlQuery("select xx ...")... Table table2 = tEnv.sqlQuery("select yy ...")... tEnv.insertInto("sink1", table1) tEnv.insertInto("sink2", table2) tEnv.execute("test") // tEnv.explain(false) | Table table1 = tEnv.sqlQuery("select xx ...")... Table table2 = tEnv.sqlQuery("select yy ...")... DmlBatch batch = tEnv.createDmlBatch(); batch.addInsert("sink1", table1); batch.addInsert("sink2", table2); ResultTable result = batch.execute() // batch.explain(false) |
Deprecated methods and new methods work together
TableEnvironment tEnv = ... DmlBatch batch = tEnv.createDmlBatch(); tEnv.sqlUpdate("insert into s1 ..."); // statement1 batch.addInsert("insert into s2 ..."); // statement2 tEnv.insertInto("sink1", tEnv.sqlQuery("select xx...")); // statement3 tEnv.executeStatement("insert into s3 ..."); // only submit the plan of this statement tEnv.explain(false); // explain the plan of statement1 and statement3 tEnv.execute( "test1"); // submit the plan of statement1 and statement3 batch.addInsert("sink2", tEnv.sqlQuery("select yy...")); // statement4 batch.explain(false); // explain the plan of statement2 and statement4 ResultTable result = batch.execute(); // submit the plan of statement2 and statement4 |
TableEnvironment’s methods and DataStream work together
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.sqlUpdate("insert into s1 ..."); // statement1 DmlBatch batch = tEnv.createDmlBatch(); batch.addInsert("sink1", tEnv.sqlQuery("select xx...")); // statement2 Table table = tEnv.sqlQuery("select yy..."); DataStream dataStream = tEnv.toAppendStream(table, Row.class); // statement3 dataStream… tEnv.explain(false); // explain the plan of statemen1 batch.explain(false); // explain the plan of statemen2 env.execute("test1") ; // submit the plan of statement3 tEnv.execute("test2") ; // submit the plan of statement1 batch.execute(); // submit the plan of statement2 |
Summary:
Methods of TableEnvironment
Methods | Comments |
void execute(String jobName) | deprecated |
String explain(boolean extended) | deprecated |
void sqlUpdate(String sql) | deprecated |
void insertInto(String, Table) | deprecated |
fromTableSource(TableSource tableSouce) | deprecated |
ResultTable executeStatement(String statement) | added |
DmlBatch createDmlBatch() | added |
New methods for single statement & multiple statements
single statement | multiple statements | |
DDL | executeStatement() | Unsupported (supports multiple DDLs for easy testing in the future) |
SHOW/DESCRIBE/USE | executeStatement() | Unsupported |
DML | executeStatement() | createDmlBatch() -> DmlBatch -> execute() |
EXPLAIN | explain(Table) (can’t explain insert statement) | createDmlBatch() -> DmlBatch -> explain() |
Compatibility, Deprecation, and Migration Plan
...
Similarly as above, suggested methods:
/** |
* Asynchronously execute the given single statement and the statement can be DDL/DML/SHOW/DESCRIBE/EXPLAIN/USE. */ |
We also should support execute executing batch sql asynchronousasynchronously. Suggested method:
interface DmlBatch { /** * Asynchronously execute the dml statements as a batch */ CompletableFuture<ResultTable> executeAsync(); } |
Add an async execute method to org.apache.flink.table.delegation.Executor
/** |
Since we already have flips[7][8] to provide asynchronous management, it’s convenient and natural to provide such a method.
Add an async execute method to org.apache.flink.streaming.api.environment#StreamExecutionEnvironment
public abstract CompletableFuture<JobExecutionResult> asyncExecute(StreamGraph streamGraph) throws Exception; |
SQL CLI integrates with new API
How SQL CLI leverage the DmlBatch class to obtain optimization?
We can reference other system design like Sqlline Batch Command[9] and introduce similarly command but we should notice that the sql in batch can only be `insert into`.How SQL CLI parse and execute multiple statements?
We don’t want to support multiple statements parsing in the TableEnvironment but this feature is needed in the SQL CLI for it’s natural to execute an external script. I have thought provided a parse method like `List<String> parse(String stmt)`, but it’s not intuitive to understand and this method shouldn’t belong to the TableEnvironment API. As the discussion in the pull-request [5][6], calcite has provided the `SqlNodeList parseSqlStmtList()` method to parse a list of SQL statements separated by a semicolon and constructs a parse tree. I think the SQL CLI can use this method to parse multiple statements and execute every single statement one by one through TableEnvironmet#executeStatement(String statement). Here is one thing we should take care of is that there are some special commands like `help/set/quit` in SQL CLI to control the environment’s lifecycle and change the variables of the context. IMO, there are some ways to deal with these commands in the multiple statements:- Support these special control commands in flink-sql-parser and the shortcoming will be that TableEnvironment should take care of those noisy commands and flink-sql-parser will lose it’s more widely expansibility to other external systems. For example, SQL CLI may need to support `source xx` that execute an external script, it’s not proper to make TableEnvironment parser to see such syntax.
- pro’s:
- unified parser
- can handle corner case, e.g. https://github.com/apache/flink/pull/8738
- con’s:
- many commands are only used for sql-client, e.g. help, quit, source
- how to meet the requirements of non-builtin commands, e.g. commands from flink-sql-gateway
- not easy to extend, it’s more difficult to implement a client-specific command in sql-parser than in specific client
- pro’s:
- SQL CLI parses those control commands on its own and should pre-split the multiple statements according to the control command. Then SQL CLI can pass the part of multiple statements to SqlParser and obtain a SqlNodeList.
- Support these special control commands in flink-sql-parser and the shortcoming will be that TableEnvironment should take care of those noisy commands and flink-sql-parser will lose it’s more widely expansibility to other external systems. For example, SQL CLI may need to support `source xx` that execute an external script, it’s not proper to make TableEnvironment parser to see such syntax.
Other open question?
pro’s:
- unified parser
- can handle corner case, e.g. https://github.com/apache/flink/pull/8738
...
(we should open an another flip to discuss this)
Other open question?