...
Now `void sqlUpdate(String sql)` method will execute DDLs right now while DMLs must be triggered by `TableEnvironment.execute()`. Both behaviors should be kept consistent. This method will buffer the `INSERT` statement which causes the above problem. So this method will be deprecated. We propose a new blocking method with execution result:
/** * If the statement is translated to a Flink job, the result will be * returned until the job is finished. * * @return result for SHOW/DESCRIBE/EXPLAIN, the affected row count for * `DML` (-1 means unknown), or a string message ("OK") for other * statements. * @throws Exception which occurs during the execution. |
This method only supports executing a single statement which can be DDL, DML, SHOW, DESCRIBE, EXPLAIN and USE statement. This method will be executed synchronously and return a ResultTable which is the representation of the execution result, and contains the result data and the result schema. If an error occurs, this method will throw an exception.
/** * A A ResultTable is the representation of the statement execution * result. */
* Get the schema of ResultTable. */ * Get the result contents as an iterable rows. */ |
The following table describes the result for each kind of statement:
Statement | Result Scheam | Result Value | Examples | |
DDL | field name: result field type: VARCHAR(2) | "OK" (single row) | CREATE TABLE new_table (col1 BIGINT, ...) | |
DML (INSERT/UPDATE/DELETE) | field name: affected_rowcount field type: BIGINT | the affected row count (-1 means unknown) | INSERT INTO sink_table SELECT … | |
SHOW xx | field name: result field type: VARCHAR(n) (n is the max length of values) | list all objects (multiple rows) | SHOW CATALOGS | |
DESCRIBE xx | describe the detail of an object (single row) | DESCRIBE CATALOG catalog_name | ||
EXPLAIN xx | explain the plan of a query (single row) | EXPLAIN PLAN FOR SELECT … | ||
USE xx | field name: result field type: VARCHAR(2) | "OK" (single row) | USE CATALOG catalog_name |
`insertInto(String, Table)`
Like the `INSERT` statement, the Tables passed to this method will also be buffered and will cause the buffer problem. So we advise deprecating this method.
`Table.insertInto` will use this deprecated method now. Once this method is removed in the future, we will change the behavior of `Table.insertInto` method from lazy execution (triggered by `TableEnvironment.execute` method) to immediate execution (like `executeStatement` method).
`execute(String jobName)` & `explain(boolean)`
...
`DmlBatch` supports adding a list of SQLs and Tables through the `addXX` methods, getting the plan of all statements through the `explain` method, optimizing the whole statements and submitting the job through the `execute` method. The added statements and Tables will be cleared when calling the `execute` method.
interface TableEnvironment { /** * Create a DmlBatch instance which can add dml statements or Tables |
...
to the batch, the planner can optimize all added statements and |
...
Tables together for better performance. */ DmlBatch createDmlBatch(); } |
interface DmlBatch { /** |
...
* add insert statement to the batch. |
...
*/ void addInsert(String insert); /** |
...
* add Table with the given sink table name to the batch. |
...
*/ void addInsert(String targetPath, Table table);
/** * execute all statements and Tables as a batch. * * The added statements and Tables will be cleared when executing * this method. */ ResultTable execute() throws Exception;
/** * returns the AST and the execution plan to compute the result of the * all statements and Tables. * * @param extended if the plan should contain additional properties. * e.g. estimated cost, traits */ String explain(boolean extended); } |
Each statement or Table has a return value which is the affected row count of a statement or a Table. So the ResultTable has multiple columns. All column types are BIGINT, and the column name is "affected_rowcount_" plus the index of the statement or Table. e.g.
DmlBatch batch = tEnv.createDmlBatch(); batch.addInsert("insert into xx ..."); batch.addInsert("yy", tEnv.sqlQuery("select ...")); batch.execute("test") |
The schema and data in ResultTable:
...
First, let’s discuss the buffer problem in depth. Actually there are two levels of buffer, TableEnvironment will buffer SQLs/Tables and StreamExecutionEnvironment will buffer transformations to generate StreamGraph. Each TableEnvironment instance holds a StreamExecutionEnvironment instance. Currently, when translating a FlinkRelNode into a Flink operator, the generated transformations will be added to StreamExecutionEnvironment’s buffer. The bug[2] is caused by this behavior. Let’s give another simple example to explain the problem of StreamExecutionEnvironment’s buffer.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // will add transformations to env when translating to execution plan tEnv.sqlUpdate("INSERT INTO sink1 SELECT a, b FROM MyTable1") Table table = tEnv.sqlQuery("SELECT c, d from MyTable2") DataStream dataStream = tEnv.toAppendStream(table, Row.class) dataStream… env.execute("job name") ; // or tEnv.execute("job name") |
The job submitted by each execute method contains the topology of both queries. Users are confused about the behavior. As suggested in "Public Interfaces",`StreamExecutionEnvironment.execute` only triggers DataStream program execution, and `TableEnvironment.execute` only triggers table program execution. So the expected behavior for the above example is `env.execute("job name")` submits the second query, and `tEnv.execute("job name") ` submits the first query.
...
Deprecated methods and new methods work together
TableEnvironment tEnv = ... DmlBatch batch = tEnv.createDmlBatch(); tEnv.sqlUpdate("insert into s1 ..."); // statement1 batch.addInsert("insert into s2 ..."); // statement2 tEnv.insertInto("sink1", tEnv.sqlQuery("select xx...")); // statement3 tEnv.executeStatement("insert into s3 ..."); // only submit the plan of this statement tEnv.explain(false); // explain the plan of statement1 and statement3 tEnv.execute( "test1"); // submit the plan of statement1 and statement3 batch.addInsert("sink2", tEnv.sqlQuery("select yy...")); // statement4 batch.explain(false); // explain the plan of statement2 and statement4 ResultTable result = batch.execute(); // submit the plan of statement2 and statement4 |
TableEnvironment’s methods and DataStream work together
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.sqlUpdate("insert into s1 ..."); // statement1 DmlBatch batch = tEnv.createDmlBatch(); batch.addInsert("sink1", tEnv.sqlQuery("select xx...")); // statement2 Table table = tEnv.sqlQuery("select yy..."); DataStream dataStream = tEnv.toAppendStream(table, Row.class); // statement3 dataStream… tEnv.explain(false); // explain the plan of statemen1 batch.explain(false); // explain the plan of statemen2 env.execute("test1") ; // submit the plan of statement3 tEnv.execute("test2") ; // submit the plan of statement1 batch.execute(); // submit the plan of statement2 |
Summary:
Methods of TableEnvironment
...
single statement | multiple statements | |||
DDL | executeStatement() | Unsupported (supports multiple DDLs for easy testing in the future) | ||
SHOW | /DESCRIBE | /USE | executeStatement() | Unsupported |
DML | executeStatement() | createDmlBatch() -> DmlBatch -> execute() | ||
EXPLAIN | explain(Table) (can’t explain insert statement) | createDmlBatch() -> DmlBatch -> explain() |
...
Notice: depends on FLIP-73/FLIP-74, not the scope of this flip.
To support execute time-cost batch sql or no-end streaming sql, it’s needed to provide an asynchronous execute execution way.
Provide async execute method for
...
executeStatement and
...
DmlBatch.execute
Similarly as above, suggested methods:
/*** Asynchronously Asynchronously execute the given single statement immediately and the * statement can be ddl/dml/`select`. A statement separated by semicolon *is not supported. |
...
We also should support execute batch sql asynchronous. Suggested method:
interface DmlBatch { /** * Asynchronously execute the dml statements as a batch */ CompletableFuture<ResultTable> executeAsync(); } |
Add an async execute method to org.apache.flink.table.delegation.Executor
...