...
Page properties |
---|
...
|
Discussion thread:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-84-Improve-amp-Refactor-API-of-Table-Module-td34537.html
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-84-Feedback-Summary-td39261.html
...
|
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
The goal of this FLIP is to address the shortcomings mentioned above and make the APIs in TableEnvironment & Table more clear and stable. This FLIP won't support multiline statements which needs more discussion in further FLIP. (There have been some conclusions, please see the appendix.)
Public Interfaces
executeSqlWe We propose to deprecate the following methods:
TableEnvironment.sqlUpdate(String)
- TableEnvironment.insertInto(String, Table)
- TableEnvironment.execute(String)
- TableEnvironment.explain(boolean)
- TableEnvironment.fromTableSource(TableSource<?>)
- Table.insertInto(String)
meanwhile, we propose to introduce the following new methods:
Code Block language java title New methods in TableEnvironment interface TableEnvironment { // execute the given single statement, and return the execution result. TableResult executeSql(String statement); // get the AST and the execution plan for the given single statement (DQL, DML) String explainSql(String statement, ExplainDetail... extraDetails); // create a StatementSet instance which can add DML statements or Tables // to the set and explain or execute them as a batch. StatementSet createStatementSet(); }
Code Block language java title New methods in Table interface Table { // write the Table to a TableSink that was registered // under the specified path. TableResult executeInsert(String tablePath); // write the Table to a TableSink that was registered // under the specified path. TableResult executeInsert(String tablePath, boolean overwrite); // create a StatementSet instance which can add DML statements or Tables // to the set and explain or execute them as a batch. String explain(ExplainDetail... extraDetails); // get the contents of the current table. TableResult execute(); }
Code Block language java title New class: TableResult interface TableResult { // return JobClient if a Flink job is submitted // (e.g. for DML/DQL statement), else return empty (e.g. for DDL). Optional<JobClient> getJobClient(); // return the schema of the result TableSchema getTableSchema(); // return the ResultKind which can avoid custom parsing of // an "OK" row in programming ResultKind getResultKind(); // get the row contents as an iterable rows Iterator<Row> collect(); // print the result contents void print(); }
Code Block language java title New class: ResultKind public enum ResultKind { // for DDL, DCL and statements with a simple "OK" SUCCESS, // rows with important content are available (DML, DQL) SUCCESS_WITH_CONTENT }
Code Block language java title New class: StatementSet interface StatementSet { // add single INSERT statement into the set StatementSet addInsertSql(String statement); // add Table with the given sink table name to the set StatementSet addInsert(String targetPath, Table table); // add Table with the given sink table name to the set StatementSet addInsert(String targetPath, Table table, boolean overwrite); // returns the AST and the execution plan to compute // the result of all statements and Tables String explain(ExplainDetail... extraDetails); // execute all statements and Tables as a batch TableResult execute() throws Exception; }
Code Block language java title New class: ExplainDetail public enum ExplainDetail { STATE_SIZE_ESTIMATE, UID, HINTS, ... }
- For current messy Flink table program trigger point, we propose that: for TableEnvironment and StreamTableEnvironment, you must use `TableEnvironment.execute()` to trigger table program execution, once you convert the table program to a DataStream program (through `toAppendStream` or `toRetractStream` method), you must use `StreamExecutionEnvironment.execute` to trigger the DataStream program.
Similar rule for BatchTableEnvironment, you must use `TableEnvironment.execute()` to trigger batch table program execution, once you convert the table program (through `toDataSet` method) to a DataSet program, you must use `ExecutionEnvironment.execute` to trigger the DataSet program.
...
Code Block | ||||
---|---|---|---|---|
| ||||
interface TableEnvironment { /** * Execute the given single statement and * the statement can be DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE. * * If the statement is translated to a Flink job (e.g. DML/DQL), * the TableResult will be returned until the job is submitted, and * contains a JobClient instance to associate the job. * Else, the TableResult will be returned until the statement * execution is finished, does not contain a JobClient instance. * * @return result for DQL/SHOW/DESCRIBE/EXPLAIN, the affected row count * for `DML` (-1 means unknown), or a string message ("OK") for other * statements. * @throws Exception which occurs during the* executionstatements. */ TableResult executeSql(String statement); } |
...
This method only supports executing a single statement which can be DDL, DML, DQL, SHOW, DESCRIBE, EXPLAIN and USE. For DML and DQL, this method returns TableResult until once the job is has been submitted. For other DDL and DCL statements, TableResult is returned until the execution is once the operation has finished. TableResult is the representation of the execution result, and contains the result data and the result schema. TableResult contains a JobClient which associates the job if the statement is DML. If an error occurs, this method will throw an exception.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * A TableResult is the representation of the statement execution result. */ interface TableResult { /** * return JobClient if a Flink job is submitted * (e.g. for DML/DQL statement), else return empty (e.g. DDL). */ Optional<JobClient> getJobClient(); /** * Get the schema of result. */ TableSchema getTableSchema(); /** * return the ResultKind which can avoid custom parsing of * an "OK" row in programming */ ResultKind getResultKind(); /** * Get the result contents as an iterable rows. */ Iterator<Row> collect(); /** * Print the result contents. */ void print(); } |
...
Statement | Result Schema | Result Value | Result Kind | Examples | |
DDL | field name: result field type: VARCHAR(2) | "OK" (single row) | SUCCESS | CREATE TABLE new_table (col1 BIGINT, ...) | |
DML (INSERT/UPDATE/DELETE) | field name: affected_rowcount field type: BIGINT | the affected row count (-1 means unknown) | SUCCESS_WITH_CONTENT | INSERT INTO sink_table SELECT … | |
SHOW xx | field name: result field type: VARCHAR(n) (n is the max length of values) | list all objects (multiple rows) | SUCCESS_WITH_CONTENT | SHOW CATALOGS | |
DESCRIBE xx | describe the detail of an object (single row) | DESCRIBE CATALOG catalog_name | |||
EXPLAIN xx | explain the plan of a query (single row) | EXPLAIN PLAN FOR SELECT … | |||
USE xx | field name: result field type: VARCHAR(2) | "OK" (single row) | SUCCESS | USE CATALOG catalog_name | |
SELECT xx | (select schema) | (select value) | SUCCESS_WITH_CONTENT | SELECT * FROM ... |
`TableEnvironment.insertInto(String, Table)` & `Table.insertInto(String)`
...
Code Block | ||||
---|---|---|---|---|
| ||||
interface StatementSet {
/**
* add insert statement to the set.
*/
StatementSet addInsertSql(String statement);
/**
* add Table with the given sink table name to the set.
*/
StatementSet addInsert(String targetPath, Table table);
/**
* add Table with the given sink table name to the set.
*/
StatementSet addInsert(String targetPath, Table table, boolean overwrite);
/**
* returns the AST and the execution plan to compute the result of the
* all statements and Tables.
*
* @param extraDetails the extra details which the plan should contain.
* e.g. estimated cost, uid
*/
String explain(ExplainDetail... extraDetails);
/**
* execute all statements and Tables as a batch.
*
* The added statements and Tables will be cleared when executing
* this method.
*/
TableResult execute() throws Exception;
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * ExplainDetail defines the types of details for explain result */ public enum ExplainDetail { STATE_SIZE_ESTIMATE, UID, HINTS, ... } |
...
single statement | multiple statements | |
DDL | executeSql(String) | Unsupported (supports multiple DDLs for easy testing in the future) |
SHOW/DESCRIBE/USE | executeSql(String) | Unsupported |
DQL | executeSql(String) | Unsupported |
DML | executeSql(String) | createStatementSet() -> StatementSet -> execute() |
EXPLAIN | explain(Table) explainSql(String) | createStatementSet() -> StatementSet -> explain() |
...