Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current stateUnder Discussion

Discussion thread:

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-84-Improve-amp-Refactor-API-of-Table-Module-td34537.html
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-84-Feedback-Summary-td39261.html

...

  1. Inconsistent execution semantics for `sqlUpdate()`. For now, one DDL statement passed to this method will be executed immediately while one `INSERT INTO` statement actually gets executed when we call the `execute()` method, which confuses users a lot.
  2. Don’t support obtaining the returned value from sql execute. The FLIP-69[1] introduces a lot of common DDLs such as `SHOW TABLES`, which require that TableEnvironment can have an interface to obtain the executed result of one DDL. SQL CLI also has a strong demand for this feature so that we can easily unify the execute way of SQL CLI and TableEnvironemt. Besides, the method name `sqlUpdate` is not consistent with doing things like `SHOW TABLES`.
  3. Unclear and buggy support buffering SQLs/Tables execution[2]. Blink planner has provided the ability to optimize multiple sinks, but we don’t have a clear mechanism through TableEnvironment API to control the whole flow.
  4. Unclear Flink table program trigger point. Both `TableEnvironment.execute()` and `StreamExecutionEnvironment.execute()` can trigger a Flink table program execution. However if you use TableEnvironment to build a Flink table program, you must use `TableEnvironment.execute()` to trigger execution, because you can’t get StreamExecutionEnvironment instance. If you use StreamTableEnvironment to build a Flink table program, you can use both to trigger execution. If you convert a table program to a DataStream program (using StreamExecutionEnvironment.toAppendStream/toRetractStream), you also can use both to trigger execution. So it’s hard to explain which `execute` method should be used.  Similar to StreamTableEnvironment, BatchTableEnvironment has the same problem.
  5. Don’t support executing Flink table jobs in an asynchronous way. In streaming mode, one `INSERT INTO xxx` statement may never end. It’s also possible that one ETL task takes too much time to be done in a batch environment. So it’s very natural and necessary to support executing Flink table jobs in an asynchronous way. multiline statements. Currently, TableEnvironment only supports single statement. "Multiline statements" is also an important feature for SQL client and third-part sql based platforms.

Let’s give an example to explain the buffering SQLs/Tables execution problem:

...

  1. Users are confused by what kinds of sql are executed at once and what are buffered and what kinds of sql are buffered until triggered by the execute method.
  2. Buffering SQLs/Tables will cause behavior undefined. We may want to insert data into the `test` table with the `/tmp1` path but get the wrong result of `/tmp2`.

Public Interfaces

The goal of this FLIP is to address the shortcomings mentioned above and make API of TableEnvironment & Table more clear and stable. This FLIP won't support multiline statement which needs more discussion in further FLIP. (There have been some conclusions, please see the appendix.)

Public Interfaces

  1. executeSqlWe propose to deprecate the executeSqlWe propose to deprecate the following methods:

    • TableEnvironment.sqlUpdate(String)

    • TableEnvironment.insertInto(String, Table)
    • TableEnvironment.execute(String)
    • TableEnvironment.explain(boolean)
    • TableEnvironment.fromTableSource(TableSource<?>)
    • Table.insertInto(String)

  2. meanwhile, we propose to introduce the following new methods:

    Code Block
    languagejava
    titleNew methods in TableEnvironment
    interface TableEnvironment {
        // execute the given single statement, and return the execution result.
    	TableResult executeSql(String statement) throw Exception;
        
        // get the AST and the execution plan for the given single statement (DQL, DML)
        String explainSql(String statement, ExplainDetail... extraDetails);
    
        // create a StatementSet instance which can add DML statements or Tables
        // to the set and explain or execute them as a batch.
        StatementSet createStatementSet();
    }


    Code Block
    languagejava
    titleNew methods in Table
    interface Table {
        // write the Table to a TableSink that was registered
        // under the specified path.
    	TableResult executeInsert(String tablePath);
        
        // write the Table to a TableSink that was registered
        // under the specified path.
        TableResult executeInsert(String tablePath, boolean overwrite);
    
        // create a StatementSet instance which can add DML statements or Tables
        // to the set and explain or execute them as a batch.
        String explain(ExplainDetail... extraDetails);
    
        // get the contents of the current table.
        TableResult execute() throw Exception;
    }


    Code Block
    languagejava
    titleNew class: TableResult
    interface TableResult {
         // return JobClient if a Flink job is submitted
         // (e.g. for DML statement), else return empty (e.g. for DDL).
        Optional<JobClient> getJobClient();
    
        // return the schema of the result
    	TableSchema getTableSchema();
        
        // return the ResultKind which can avoid custom parsing of
        // an "OK" row in programming
        ResultKind getResultKind();
    
        // get the row contents as an iterable rows
        Iterable<Row>Iterator<Row> collect();
    
        // print the result contents
        void print();
    }


    Code Block
    languagejava
    titleNew class: ResultKind
    public enum ResultKind {
        // for DDL, DCL and statements with a simple "OK" 
    	SUCCESS,
    
        // rows with important content are available (DML, DQL) 
        SUCCESS_WITH_CONTENT
    }


    Code Block
    languagejava
    titleNew class: StatementSet
    interface StatementSet  {
        // add single INSERT statement into the set
        StatementSet addInsertSql(String statement);
    
        // add Table with the given sink table name to the set
        StatementSet addInsert(String targetPath, Table table);
    
        // add Table with the given sink table name to the set
    	StatementSet addInsert(String targetPath, Table table, boolean overwrite);
    
        // returns the AST and the execution plan to compute 
        // the result of all statements and Tables
        String explain(ExplainDetail... extraDetails);
    
        // execute all statements and Tables as a batch
        TableResult execute() throws Exception;
    }


    Code Block
    languagejava
    titleNew class: ExplainDetail
    public enum ExplainDetail {
       STATE_SIZE_ESTIMATE,
       UID,
       HINTS,
       ...
    }


  3. For current messy Flink table program trigger point, we propose that: for TableEnvironment and StreamTableEnvironment, you must use `TableEnvironment.execute()` to trigger table program execution, once you convert the table program to a DataStream program (through `toAppendStream` or `toRetractStream` method), you must use `StreamExecutionEnvironment.execute` to trigger the DataStream program.
    Similar rule for BatchTableEnvironment, you must use `TableEnvironment.execute()` to trigger batch table program execution, once you convert the table program (through `toDataSet` method) to a DataSet program, you must use `ExecutionEnvironment.execute` to trigger the DataSet program.

...

Code Block
languagejava
titleNew method in TableEnvironment
interface TableEnvironment {
     /**
      * Execute the given single statement and 
      * the statement can be DDL/DML/SHOW/DESCRIBE/EXPLAIN/USE. 
      * 
      * If the statement is translated to a Flink job (e.g. DML), 
      * the TableResult will be returned until the job is submitted, and 
      * contains a JobClient instance to associate the job.
      * Else, the TableResult will be returned until the statement 
      * execution is finished, does not contain a JobClient instance.
      * 
      * @return result for SHOW/DESCRIBE/EXPLAIN, the affected row count 
      * for `DML` (-1 means unknown), or a string message ("OK") for other  
      * statements.
      * @throws Exception which occurs during the execution.
      */
	TableResult executeSql(String statement) throw Exception;
}


This method only supports executing a single statement which can be DDL, DML, SHOW, DESCRIBE, EXPLAIN and USE. For DML, this method returns TableResult until the job is submitted. For other statements, TableResult is returned until the execution is finished. TableResult is the representation of the execution result, and contains the result data and the result schema. TableResult contains a JobClient which associates the job if the statement is DML. If an error occurs, this method will throw an exception.

Code Block
languagejava
titleNew class: TableResult
/**
 * A TableResult is the representation of the statement execution result.
 */
interface TableResult {
   /**
    * return JobClient if a Flink job is submitted 
    * (e.g. for DML statement), else return empty (e.g. DDL).
    */
   Optional<JobClient> getJobClient();

   /** 
    * Get the schema of result. 
    */
	TableSchema getTableSchema();
    
    /**
     * return the ResultKind which can avoid custom parsing of
     * an "OK" row in programming
     */
    ResultKind getResultKind();

     /**
      * Get the result contents as an iterable rows.
      */
    Iterable<Row>Iterator<Row> collect();

    /**
     * Print the result contents.
     */
    void print();
}

...

Code Block
languagejava
titleNew methods in Table
interface Table {
    /** 
     * Write the Table to a TableSink that was registered 
     * under the specified path.
     * 
     * @param tablePath The path of the registered TableSink to which 
     * the Table is written.
     */
	TableResult executeInsert(String tablePath);
    
    /** 
     * Write the Table to a TableSink that was registered 
     * under the specified path.
     * 
     * @param tablePath The path of the registered TableSink to which 
     * the Table is written.
     * @param overwrite Whether overwrite the existing data
     */
    TableResult executeInsert(String tablePath, boolean overwrite);

    /** 
     * Returns the AST and the execution plan to compute the result of 
     * the current Table.
     * 
     * @param extraDetails the extra details which the plan should contain. 
     * e.g. estimated cost, uid
     */
    String explain(ExplainDetail... extraDetails);

    /** 
     * Get the contents of the current table.
     */
    TableResult execute() throw Exception;
}


How to correct the execution behavior?

...

[7] flip-73 Introducing Executors for job submission

[8] flip-74 Flink JobClient API

[9] Sqline deal with batch execute

...

submission

[8] flip-74 Flink JobClient API

[9] Sqline deal with batch execute

[10] Feedback Summary

[11] Feedback Summary discussion thread

Appendix - Future Plan: not the scope of this flip.

Multiline statements

"Multiline statements" is also an important feature for SQL client and third-part sql based platforms. In SQL client, the most typical scenario is execute a SQL script which contains multiple statements. The main point that we are talking about is what's the behavior of this method when executing each a single line statement for batch, streaming and mix scenario (the scenarios are listed in [10]).

There is a preliminary draft that the method is:

Code Block
languagejava
titleTableEnvironment
interface TableEnvironment {
   /** 
    * Execute multiline statement separated by a semicolon, return Iterator 
    * over all TableResults that corresponds to each single line statement.
    * The Iterator.next() method would trigger the next statement execution. 
    * This allows a caller to decide whether execute the statement 
    * synchronously or asynchronously.
    * 
    * @param statements multiline statement separated by a semicolon
    */
    Iterator<TableResult> executeMultilineSql(String statements);
}

We introduce `executeMultilineSql` method and return `Iterator<TableResult>` which would trigger the next statement submission. This allows a caller to decide synchronously when to submit
statements async to the cluster. Thus, a service such as the SQL Client can handle the result of each statement individually and process statement by statement sequentially.

Please refer to the feedback summary document[10] and the discussion thread[11] for more detail. and we will keep discussion in the future

...

.

SQL CLI integrates with new API

  1. How SQL CLI leverage the StatementSet class to obtain optimization?

    We can reference other system design like Sqlline Batch Command[9] and introduce similarly command but we should notice that the sql in batch can only be `insert into`.

  2. How SQL CLI parse and execute multiple statements?

    We don’t want to Currently, TableEnvironment does not support multiple statements parsing in the TableEnvironment but this feature is needed in the SQL CLI for it’s natural to execute an external script. I have thought provided a parse method like `List<String> parse(String stmt)`, but it’s not intuitive to understand and this method shouldn’t belong to the TableEnvironment API. As the discussion in the pull-request [5][6], calcite has provided the `SqlNodeList parseSqlStmtList()` method to parse a list of SQL statements separated by a semicolon and constructs a parse tree. I think the SQL CLI can use this method to parse multiple statements and execute every single statement one by one through TableEnvironmet#executeSql(String statement). Here is one thing we should take care of is that there are some special commands like `help/set/quit` in SQL CLI to control the environment’s lifecycle and change the variables of the context. IMO, there are some ways to deal with these commands in the multiple statements:
    1. Support these special control commands in flink-sql-parser and the shortcoming will be that TableEnvironment should take care of those noisy commands and flink-sql-parser will lose it’s more widely expansibility to other external systems. For example, SQL CLI may need to support `source xx` that execute an external script, it’s not proper to make TableEnvironment parser to see such syntax.
      1.  pro’s: 
      2.  con’s: 
        • many commands are only used for sql-client, e.g. help, quit, source
        • how to meet the requirements of non-builtin commands, e.g. commands from flink-sql-gateway
        • not easy to extend, it’s more difficult to implement a client-specific command in sql-parser than in specific client 
    2. SQL CLI parses those control commands on its own and should pre-split the multiple statements according to the control command. Then SQL CLI can pass the part of multiple statements to SqlParser and obtain a SqlNodeList. 
      1. pro’s:
        • sql-parser is more clean
        • more easy to extend for sql-client
      2.  con’s: 
    3. Flink already introduces a `Parser` interface which is exposed by `Planner`. We can add one more method to `Parser` like: List<String> splitStatement(String) and then we can borrow calcite to achieve this functionality. Special client commands (e.g. help, quit, source) are not supported in sql-parser now. Because the SqlParser#parseStmtList return SqlNodeList, not a string list, those special commands are not defined in SqlNode. So I think this approach is only a complement to the first one.
    4. Support a utility class to parse a statement separated by semicolon into multiple statements.
      1. pro’s:
        • more easy to extend for sql-client
        • can handle corner case in a unified place
      2. con’s:
        • many parsers: sql-parser,  a utility parser
    5. -
      I think option d is better. Looking forward to more people's opinions. (we can use TableEnvironment#executeMultilineSql to support this.

      we will open an another flip to discuss this).

  3. Other open question?