Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Users are confused by what kinds of sql are executed at once and what are buffered and what kinds of sql are buffered until triggered by the execute method.
  2. Buffering SQLs/Tables will cause behavior undefined. We may want to insert data into the `test` table with the `/tmp1` path but get the wrong result of `/tmp2`.

Public Interfaces

  1. We executeSqlWe propose to deprecate the following methods:

    • TableEnvironment.sqlUpdate(String)

    • TableEnvironment.insertInto(String, Table)
    • TableEnvironment.execute(String)
    • TableEnvironment.explain(boolean)
    • TableEnvironment.fromTableSource(TableSource<?>)
    • Table.insertInto(String)

  2. meanwhile, we propose to introduce the following new methods:

    Code Block
    languagejava
    titleNew methods in TableEnvironment
    interface TableEnvironment {
        // synchronouslyasynchronously execute the given single statement immediately, 
        // and return the execution result.
    	TableResult executeSql(String statement) throw Exception;
        
        // get the AST and the execution plan for the given single statement (DQL, DML)
        String explainSql(String statement, ExplainDetail... extraDetails);
    
        // create a StatementSet instance which can add DML statements or Tables
        // to the set and explain or execute them as a batch.
        StatementSet createStatementSet();
    }


    Code Block
    languagejava
    titleNew methods in Table
    interface Table {
        // synchronously write the Table to a TableSink that was registered
        // under the specified path.
    	TableResult executeInsert(String tablePath);
        
        // synchronously write the Table to a TableSink that was registered
        // under the specified path.
        TableResult executeInsert(String tablePath, boolean overwrite);
    
        // create a StatementSet instance which can add DML statements or Tables
        // to the set and explain or execute them as a batch.
        String explain(ExplainDetail... extraDetails);
    
        // get the contents of the current table.
        TableResult execute() throw Exception;
    }


    Code Block
    languagejava
    titleNew class: TableResult
    interface TableResult {
         // return JobClient if a Flink job is submitted
         // (e.g. for DML statement), else return empty (e.g. DDL).
        Optional<JobClient> getJobClient();
    
        // return the schema of the result
    	TableSchema getTableSchema();
        
        // return the ResultKind which can avoid custom parsing of
        // an "OK" row in programming
        ResultKind getResultKind();
    
        // get the row contents as an iterable rows
        Iterable<Row> collect();
    
        // print the result contents
        void print();
    }


    Code Block
    languagejava
    titleNew class: ResultKind
    public enum ResultKind {
        // for DDL, DCL and statements with a simple "OK" 
    	SUCCESS,
    
        // rows with important content are available (DML, DQL) 
        SUCCESS_WITH_CONTENT
    }


    Code Block
    languagejava
    titleNew class: StatementSet
    interface StatementSet  {
        // add single INSERT statement into the set
        StatementSet addInsertSql(String statement);
    
        // add Table with the given sink table name to the set
        StatementSet addInsert(String targetPath, Table table);
    
        // add Table with the given sink table name to the set
    	StatementSet addInsert(String targetPath, Table table, boolean overwrite);
    
        // returns the AST and the execution plan to compute 
        // the result of all statements and Tables
        String explain(ExplainDetail... extraDetails);
    
        // execute all statements and Tables as a batch
        TableResult execute() throws Exception;
    }


    Code Block
    languagejava
    titleNew class: ExplainDetail
    public enum ExplainDetail {
       STATE_SIZE_ESTIMATE,
       UID,
       HINTS,
       ...
    }


  3. For current messy Flink table program trigger point, we propose that: for TableEnvironment and StreamTableEnvironment, you must use `TableEnvironment.execute()` to trigger table program execution, once you convert the table program to a DataStream program (through `toAppendStream` or `toRetractStream` method), you must use `StreamExecutionEnvironment.execute` to trigger the DataStream program.
    Similar rule for BatchTableEnvironment, you must use `TableEnvironment.execute()` to trigger batch table program execution, once you convert the table program (through `toDataSet` method) to a DataSet program, you must use `ExecutionEnvironment.execute` to trigger the DataSet program.

...

Code Block
languagejava
titleNew method in TableEnvironment
interface TableEnvironment {
     /**
      * SynchronouslyAsynchronously execute the given single statement immediately and 
      * the statement can be DDL/DML/SHOW/DESCRIBE/EXPLAIN/USE. 
      * If 
      * If the statement is translated to a Flink job (e.g. DML), the
 result will be 
  * the TableResult will *be returned until the job is submitted, and finished.
      * contains a JobClient instance to associate the job.
      * Else, @returnthe resultTableResult for SHOW/DESCRIBE/EXPLAIN, the affected row countwill be returned until the statement 
      * forexecution `DML` (-1 means unknown), is finished, does not contain a JobClient instance.
      * 
      * @return result for SHOW/DESCRIBE/EXPLAIN, the affected row count 
      * for `DML` (-1 means unknown), or a string message ("OK") for other  
      * statements.
      * @throws Exception which occurs during the execution.
      */
	TableResult executeSql(String statement) throw Exception;
}

...

This method only supports executing a single statement which can be DDL, DML, SHOW, DESCRIBE, EXPLAIN and USE statement. This method will be executed synchronously and return a TableResult which is the For DML, this method returns TableResult until the job is submitted. For other statements, TableResult is returned until the execution is finished. TableResult is the representation of the execution result, and contains the result data and the result schema. TableResult contains a JobClient which associates the job if the statement is DML. If an error occurs, this method will throw an exception.

Code Block
languagejava
titleNew class: TableResult
/**
 * A TableResult is the representation of the statement execution result.
 */
interface TableResult {
     /** 
    * Get the* schemareturn of result. 
    */
	TableSchema getTableSchema();
   JobClient if a Flink job is submitted 
    /**
 (e.g. for DML statement), *else return the ResultKind which can avoid custom parsing ofempty (e.g. DDL).
     * an "OK" row in programming
     */
   Optional<JobClient> ResultKind getResultKindgetJobClient();

       /** 
        * Get the resultschema contents as an iterable rows.
      of result. 
    */
	TableSchema getTableSchema();
   Iterable<Row> collect();

    /**
     * Printreturn the result contents.ResultKind which can avoid custom parsing of
     * an "OK" row in programming
     */
    voidResultKind printgetResultKind();
}
Code Block
languagejava
titleNew class: ResultKind

     /**
  * ResultKind defines the types* ofGet the result contents as an iterable rows.
      */
public  enum ResultKind {
    // for DDL, DCL and statements with a simple "OK" 
	SUCCESS,

    // rows with important content are available (DML, DQL) 
    SUCCESS_WITH_CONTENT
}

...

 Iterable<Row> collect();

    /**
     * Print the result contents.
     */
    void print();
}


Code Block
languagejava
titleNew class: ResultKind
/**
 * ResultKind defines the types of the result.
 */
public enum ResultKind {
    // for DDL, DCL and statements with a simple "OK" 
	SUCCESS,

    // rows with important content are available (DML, DQL) 
    SUCCESS_WITH_CONTENT
}


The following table describes the result for each kind of statement:

Statement

Result Schema

Result Value

Result Kind

Examples

DDL

field name: result

field type: VARCHAR(2)

"OK"

(single row)

SUCCESS

CREATE TABLE new_table (col1 BIGINT, ...)

DML

(INSERT/UPDATE/DELETE)

field name: affected_rowcount

field type: BIGINT

the

Statement

Result Schema

Result Value

Result Kind

Examples

DDL

field name: result

field type: VARCHAR(2)

"OK"

(single row)

SUCCESS

CREATE TABLE new_table (col1 BIGINT, ...)

DML

(INSERT/UPDATE/DELETE)

field name: affected_rowcount

field type: BIGINT

the affected row count

(-1 means unknown)

SUCCESS_WITH_CONTENT

INSERT INTO sink_table SELECT …  

SHOW xx

field name: result

field type: VARCHAR(n)

(n is the max length of values)

list all objects

(multiple rows)

SUCCESS_WITH_CONTENT

SHOW CATALOGS

DESCRIBE xx

describe the detail of an object 

(single row)

DESCRIBE CATALOG catalog_name

EXPLAIN xx

explain the plan of a query

(single row)

EXPLAIN PLAN FOR SELECT …

USE xx

field name: result

field type: VARCHAR(2)

"OK"

(single row)

SUCCESS

USE CATALOG catalog_name

...

Code Block
languagejava
titleNew methods in Table
interface Table {
    /** 
     * Synchronously writeWrite the Table to a TableSink that was registered 
     * under the specified path.
     * 
     * @param tablePath The path of the registered TableSink to which 
     * the Table is written.
     */
	TableResult executeInsert(String tablePath);
    
    /** 
     * Synchronously writeWrite the Table to a TableSink that was registered 
     * under the specified path.
     * 
     * @param tablePath The path of the registered TableSink to which 
     * the Table is written.
     * @param overwrite Whether overwrite the existing data
     */
    TableResult executeInsert(String tablePath, boolean overwrite);

    /** 
     * Returns the AST and the execution plan to compute the result of 
     * the current Table.
     * 
     * @param extraDetails the extra details which the plan should contain. 
     * e.g. estimated cost, uid
     */
    String explain(ExplainDetail... extraDetails);

    /** 
     * Get the contents of the current table.
     */
    TableResult execute() throw Exception;
}

...

Code Block
languagejava
titleExample
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// will add transformations to env when translating to execution plan
tEnv.sqlUpdate("INSERT INTO sink1 SELECT a, b FROM MyTable1")

Table table = tEnv.sqlQuery("SELECT c, d from MyTable2")
DataStream dataStream = tEnv.toAppendStream(table, Row.class)
dataStream…

env.execute("job name") ;
// or tEnv.execute("job name") 

...

[9] Sqline deal with batch execute

Appendix - Future Plan:

...

 not the scope of this flip.

...

To support execute time-cost batch sql or no-end streaming sql, it’s needed to provide an asynchronous execution way. 

Provide async execute method for executeSql and StatementSet.execute

Similarly as above, suggested methods:

Code Block
languagejava
titleTableEnvironment
interface TableEnvironment {
   /** 
    * Asynchronously execute the given single statement and the statement can be DDL/DML/SHOW/DESCRIBE/EXPLAIN/USE. 
    */
    CompletableFuture<TableResult> executeSqlAsync(String statement);
}

We also should support executing batch sql asynchronously. Suggested method:

...

languagejava
titleStatementSet

...

SQL CLI integrates with new API

...