Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. We propose to deprecate the following methods:

    • TableEnvironment.sqlUpdate(String)

    • TableEnvironment.insertInto(String, Table)
    • TableEnvironment.execute(String)
    • TableEnvironment.explain(boolean)
    • TableEnvironment.fromTableSource(TableSource<?>)
    • Table.insertInto(String)

  2. meanwhile, we propose to introduce the following new methods:

    Code Block
    languagejava
    titleNew methods in TableEnvironment
    interface TableEnvironment {
        // synchronously execute the given single statement immediately, 
        // and return the execution result.
    	TableResult executeSql(String statement) throw Exception;
        
        // get the AST and the execution plan for the given single statement (DQL, DML)
        String explainSql(String statement, ExplainDetail... extraDetails);
    
        // create a StatementSet instance which can add DML statements or Tables
        // to the set and explain or execute them as a batch.
        StatementSet createStatementSet();
    }


    Code Block
    languagejava
    titleNew methods in Table
    interface Table {
        // synchronously write the Table to a TableSink that was registered
        // under the specified path.
    	TableResult executeInsert(String tablePath);
        
        // synchronously write the Table to a TableSink that was registered
        // under the specified path.
        TableResult executeInsert(String tablePath, boolean overwrite);
    
        // create a StatementSet instance which can add DML statements or Tables
        // to the set and explain or execute them as a batch.
        String explain(ExplainDetail... detailsextraDetails);
    
        // get the contents of the current table.
        TableResult execute() throw Exception;
    }


    Code Block
    languagejava
    titleTableResult
    interface TableResult {
        // return the schema of the result
    	TableSchema getTableSchema();
        
        // return the ResultKind which can avoid custom parsing of
        // an "OK" row in programming
        ResultKind getResultKind();
    
        // get the row contents as an iterable rows
        Iterable<Row> collect();
    
        // print the result contents
        void print();
    }


    Code Block
    languagejava
    titleResultKind
    public enum ResultKind {
        // for DDL, DCL and statements with a simple "OK" 
    	SUCCESS,
    
        // rows with important content are available (DML, DQL) 
        SUCCESS_WITH_CONTENT
    }


    Code Block
    languagejava
    titleStatementSet
    interface StatementSet  {
        // add single INSERT statement into the set
        StatementSet addInsertSql(String statement);
    
        // add Table with insert operation into the given sink table name to the set
        StatementSet addInsert(String targetPath, Table table);
    
        // add Table with insertthe given operationsink andtable overwritename optionto intothe set
    	StatementSet addInsert(String targetPath, Table table, boolean overwrite);
    
        // returns the AST and the execution plan to compute 
        // the result of all statements and Tables
        String explain(ExplainDetail... extraDetails);
    
        // execute all statements and Tables as a batch
        TableResult execute() throws Exception;
    }


    Code Block
    languagejava
    titleExplainDetail
    public enum ExplainDetail {
       STATE_SIZE_ESTIMATE,
       UID,
       HINTS,
       ...
    }


  3. For current messy Flink table program trigger point, we propose that: for TableEnvironment and StreamTableEnvironment, you must use `TableEnvironment.execute()` to trigger table program execution, once you convert the table program to a DataStream program (through `toAppendStream` or `toRetractStream` method), you must use `StreamExecutionEnvironment.execute` to trigger the DataStream program.
    Similar rule for BatchTableEnvironment, you must use `TableEnvironment.execute()` to trigger batch table program execution, once you convert the table program (through `toDataSet` method) to a DataSet program, you must use `ExecutionEnvironment.execute` to trigger the DataSet program.

...

Statement

Result Schema

Result Value

Result Kind

Examples

DDL

field name: result

field type: VARCHAR(2)

"OK"

(single row)

SUCCESS

CREATE TABLE new_table (col1 BIGINT, ...)

DML

(INSERT/UPDATE/DELETE)

field name: affected_rowcount

field type: BIGINT

the affected row count

(-1 means unknown)

SUCCESS_WITH_CONTENT

INSERT INTO sink_table SELECT …  

SHOW xx

field name: result

field type: VARCHAR(n)

(n is the max length of values)

list all objects

(multiple rows)

SUCCESS_WITH_CONTENT

SHOW CATALOGS

DESCRIBE xx

describe the detail of an object 

(single row)

DESCRIBE CATALOG catalog_name

EXPLAIN xx

explain the plan of a query

(single row)

EXPLAIN PLAN FOR SELECT …

USE xx

field name: result

field type: VARCHAR(2)

"OK"

(single row)

SUCCESS

USE CATALOG catalog_name


`TableEnvironment.insertInto(String, Table)

...

` &

...

`Table.insertInto(String)`

Like the `sqlUpdate` method,  `TableEnvironment.insertInto(String, Table)` and `Table.insertInto(String)` also buffter the Tables, and will cause the buffer problem. So these two methods will be deprecated.

...

Code Block
languagejava
titleNew methods in TableEnvironment
interface TableEnvironment {
   /** 
    * Create a StatementSet instance which can add DML statements or Tables 
    * to the set, the planner can optimize all added statements and Tables 
    * together for better performance.
    */
	TableResultStatementSet executeSqlcreateStatementSet(String statement) throw Exception;
}


Code Block
languagejava
titleStatementSet
interface StatementSet  {
    /** 
     * add insert statement to the set.
     */
    StatementSet addInsertSql(String statement);

    /** 
     * add Table with the given sink table name to the set.
     */
    StatementSet addInsert(String targetPath, Table table);

    /** 
     * add Table with the given sink table name to the set.
     */
	StatementSet addInsert(String targetPath, Table table, boolean overwrite);

    /** 
     * returns the AST and the execution plan to compute the result of the 
     * all statements and Tables.
     * 
     * @param extraDetails the extra details which the plan should contain. 
     * e.g. estimated cost, uid
     */
    String explain(ExplainDetail... extraDetails);

    /** 
     * execute all statements and Tables as a batch.
     * 
     * The added statements and Tables will be cleared when executing
     * this method. 
     */
    TableResult execute() throws Exception;
}

...

Code Block
languagejava
titleExample
StatementSet stmtSet = tEnv.createStatementSet();
stmtSet.addInsertSql("insert into xx ...");
stmtSet.addInsert("yy", tEnv.sqlQuery("select ..."));
stmtSet.execute("test")


The schema and data in ResultTable TableResult


column1 (insert into xx ... )

column2 (stmtSet.addInsert("yy", tEnv.sqlQuery("select ...")))

Schema

name: affected_rowcount_0

type: BIGINT

name: affected_rowcount_1

type: BIGINT

Data (single row)

-1

-1

...

Code Block
languagejava
titleNew methods in Table
interface Table {
    /** 
     * Synchronously write the Table to a TableSink that was registered 
     * under the specified path.
     * 
     * @param tablePath The path of the registered TableSink to which 
     * the Table is written.
     */
	TableResult executeInsert(String tablePath);
    
    /** 
     * Synchronously write the Table to a TableSink that was registered 
     * under the specified path.
     * 
     * @param tablePath The path of the registered TableSink to which 
     * the Table is written.
     * @param overwrite Whether overwrite the existing data
     */
    TableResult executeInsert(String tablePath, boolean overwrite);

    /** 
     * Returns the AST and the execution plan to compute the result of 
     * the current Table.
     * 
     * @param extraDetails the extra details which the plan should contain. 
     * e.g. estimated cost, uid
     */
    String explain(ExplainDetail... detailsextraDetails);

    /** 
     * Get the contents of the current table.
     */
    TableResult execute() throw Exception;
}

...

Current Interface

New Interface

tEnv.sqlUpdate("CREATE TABLE test (...) with (path = '/tmp1')");

ResultTable TableResult result = tEnv.executeStatementexecuteSql("CREATE TABLE test (...) with (path = '/tmp1')");

result...

tEnv.sqlUpdate("INSERT INTO test SELECT ...");

tEnv.execute("test");

ResultTable result TableResult result = tEnv.executeStatementexecuteSql("INSERT INTO test SELECT ...");

result...

...

Code Block
languagejava
titleExample
TableEnvironment tEnv = ...
tEnv.explainSql("insert into s1 ...")
tEnv.explainSql("select xx ...")

Table table1 = tEnv.sqlQuery("select xx ...")...
String explanation = table1.explain();
TableResult resultresult1 = table1.executeInsert("sink1");

Table table2 = tEnv.sqlQuery("select yy ...")...
TableResult resultresult2 = table2.execute();
resultresult2.print();


Deprecated methods and new methods work together

...

Code Block
languagejava
titleNew methods in TableEnvironment
interface TableEnvironment {
   /** 
    * Asynchronously execute the given single statement and the statement can be DDL/DML/SHOW/DESCRIBE/EXPLAIN/USE. 
    */
    CompletableFuture<ResultTable>CompletableFuture<TableResult> executeSqlAsync(String statement);
}

...

Code Block
languagejava
titleStatementSet
interface StatementSet {
   /** 
    * Asynchronously execute the dml statements as a batch
    */
   CompletableFuture<ResultTable>CompletableFuture<TableResult> executeAsync();
}


SQL CLI integrates with new API

  1. How SQL CLI leverage the

    DmlBatch class

    StatementSet class to obtain optimization?

    We can reference other system design like Sqlline Batch Command[9] and introduce similarly command but we should notice that the sql in batch can only be `insert into`.

  2. How SQL CLI parse and execute multiple statements?

    We don’t want to support multiple statements parsing in the TableEnvironment but this feature is needed in the SQL CLI for it’s natural to execute an external script. I have thought provided a parse method like `List<String> parse(String stmt)`, but it’s not intuitive to understand and this method shouldn’t belong to the TableEnvironment API. As the discussion in the pull-request [5][6], calcite has provided the `SqlNodeList parseSqlStmtList()` method to parse a list of SQL statements separated by a semicolon and constructs a parse tree. I think the SQL CLI can use this method to parse multiple statements and execute every single statement one by one through TableEnvironmet#executeStatementTableEnvironmet#executeSql(String statement). Here is one thing we should take care of is that there are some special commands like `help/set/quit` in SQL CLI to control the environment’s lifecycle and change the variables of the context. IMO, there are some ways to deal with these commands in the multiple statements:
    1. Support these special control commands in flink-sql-parser and the shortcoming will be that TableEnvironment should take care of those noisy commands and flink-sql-parser will lose it’s more widely expansibility to other external systems. For example, SQL CLI may need to support `source xx` that execute an external script, it’s not proper to make TableEnvironment parser to see such syntax.
      1.  pro’s: 
      2.  con’s: 
        • many commands are only used for sql-client, e.g. help, quit, source
        • how to meet the requirements of non-builtin commands, e.g. commands from flink-sql-gateway
        • not easy to extend, it’s more difficult to implement a client-specific command in sql-parser than in specific client 
    2. SQL CLI parses those control commands on its own and should pre-split the multiple statements according to the control command. Then SQL CLI can pass the part of multiple statements to SqlParser and obtain a SqlNodeList. 
      1. pro’s:
        • sql-parser is more clean
        • more easy to extend for sql-client
      2.  con’s: 
    3. Flink already introduces a `Parser` interface which is exposed by `Planner`. We can add one more method to `Parser` like: List<String> splitStatement(String) and then we can borrow calcite to achieve this functionality. Special client commands (e.g. help, quit, source) are not supported in sql-parser now. Because the SqlParser#parseStmtList return SqlNodeList, not a string list, those special commands are not defined in SqlNode. So I think this approach is only a complement to the first one.
    4. Support a utility class to parse a statement separated by semicolon into multiple statements.
      1. pro’s:
        • more easy to extend for sql-client
        • can handle corner case in a unified place
      2. con’s:
        • many parsers: sql-parser,  a utility parser
    5. -

      I think option d is better. Looking forward to more people's opinions.
       (we can open an another flip to discuss this)

  3. Other open question?

...