Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. We propose to deprecate the following methods:

    • TableEnvironment.sqlUpdate(String)

    • TableEnvironment.insertInto(String, Table)
    • TableEnvironment.execute(String)
    • TableEnvironment.explain(boolean)
    • TableEnvironment.fromTableSource(TableSource<?>)
  2. Table.insertInto(String)


  3. meanwhile, we propose to introduce the following new methods:

    Code Block
    languagejava
    titleNew methods in TableEnvironment
    interface TableResult TableEnvironment {
        // synchronously execute the given single statement immediately, 
        // and return the execution result.
    	TableResult executeSql(String statement) throw Exception;
        
        // get the AST and the execution plan for the given single statement (DQL, DML)
        String explainSql(String statement, ExplainDetail... extraDetails);
    
        // create a StatementSet instance which can add DML statements or Tables
        // to the set and explain or execute them as a batch.
        StatementSet createStatementSet();
    }


    Code Block
    languagejava
    titleNew methods in
    TableEnvironment:
    TableResult executeSql(String statement) throw Exception;
    synchronously execute the given single statement immediately, and return the execution result.
  4. String explainSql(String statement, ExplainDetail... extraDetails);
    get the AST and the execution plan for the given single statement (DQL, DML)
  5. Table
    interface Table {
        // synchronously write the Table to a TableSink that was registered
        // under the specified path.
    	TableResult executeInsert(String tablePath);
        
        // synchronously write the Table to a TableSink that was registered
        // under the specified path.
        TableResult executeInsert(String tablePath, boolean overwrite);
    
        // create a StatementSet instance which can add DML statements or Tables
        // to the set and explain or execute them as a batch.
        String explain(ExplainDetail... details);
    
        // get the contents of the current table.
        TableResult execute() throw Exception;
    }


    StatementSet createStatementSet()
    create a StatementSet instance which can add DML statements or Tables to the set and explain or execute them as a batch.
    New methods in Table:TableResult executeInsert(String tablePath);
    synchronously write the Table to a TableSink that was registered under the specified path.
    TableResult executeInsert(String tablePath, boolean overwrite);
    synchronously write the Table to a TableSink that was registered under the specified path.
    String explain(ExplainDetail... details);
    returns the AST and the execution plan to compute the result of the current Table.
    TableResult execute() throw Exception;
     get the data of the current table.
    Code Block
    languagejava
    titleTableResult
    collapsetrue
    interface TableResult {
        // return the schema of the result
    	TableSchema getTableSchema();
        
        // return the ResultKind which can avoid custom parsing of
        // an "OK" row in programming
        ResultKind getResultKind();
    
        // get the row contents as an iterable rows
        Iterable<Row> collect();
    
        // print the result contents
        void print();
    }
    publicinterfaceResultTable {
        TableSchema getResultSchema();
        Iterable<Row> getResultRows();
    }
  6. For current messy Flink table program trigger point, we propose that: for TableEnvironment and StreamTableEnvironment, you must use `TableEnvironment.execute()` to trigger table program execution, once you convert the table program to a DataStream program (through `toAppendStream` or `toRetractStream` method), you must use `StreamExecutionEnvironment.execute` to trigger the DataStream program.

Proposed Changes

`void sqlUpdate(String sql)`

Now `void sqlUpdate(String sql)` method will execute DDLs right now while DMLs must be triggered by `TableEnvironment.execute()`. Both behaviors should be kept consistent. This method will buffer the `INSERT` statement which causes the above problem. So this method will be deprecated. We propose a new blocking method with execution result:

...

This method only supports executing a single statement which can be DDL, DML, SHOW, DESCRIBE, EXPLAIN and USE statement. This method will be executed synchronously and return a ResultTable which is the representation of the execution result, and contains the result data and the result schema. If an error occurs, this method will throw an exception.

...

The following table describes the result for each kind of statement:

...

Statement

...

Result Schema

...

Result Value

...

Examples

...

DDL

...

field name: result

field type: VARCHAR(2)

...

"OK"

(single row)

...

CREATE TABLE new_table (col1 BIGINT, ...)

...

DML

(INSERT/UPDATE/DELETE)

...

field name: affected_rowcount

field type: BIGINT

...

the affected row count

(-1 means unknown)

...

INSERT INTO sink_table SELECT …  

...

SHOW xx

...

field name: result

field type: VARCHAR(n)

(n is the max length of values)

...

list all objects

(multiple rows)

...

SHOW CATALOGS

...

DESCRIBE xx

...

describe the detail of an object 

(single row)

...

DESCRIBE CATALOG catalog_name

...

EXPLAIN xx

...

explain the plan of a query

(single row)

...

EXPLAIN PLAN FOR SELECT …

...

USE xx

...

field name: result

field type: VARCHAR(2)

...

"OK"

(single row)

...

USE CATALOG catalog_name

`insertInto(String, Table)` 

Like the `INSERT` statement, the Tables passed to this method will also be buffered and will cause the buffer problem. So we advise deprecating this method.

 `Table.insertInto` will use this deprecated method now. Once this method is removed in the future, we will change the behavior of `Table.insertInto` method from lazy execution (triggered by `TableEnvironment.execute` method) to immediate execution (like `executeStatement` method). 

`execute(String jobName)` & `explain(boolean)`

Since we will disable buffering SQLs/Tables and plans, it’s meaningless to provide `execute(String jobName)` as the trigger entry point and explain(boolean) method should also not be used anymore. So we advise deprecating those two methods. Instead, we introduce a new method named `createDmlBatch` and a new class named `DmlBatch` to support multiple SQLs/Tables optimization.

From the class name of  `DmlBatch`, we know that only DML statements can be added to `DmlBatch`. In `DmlBatch`, only INSERT is supported now, DELETE and UPDATE can also be supported in the future.

 `DmlBatch` supports adding a list of SQLs and Tables through the `addXX` methods, getting the plan of all statements through the `explain` method, optimizing the whole statements and submitting the job through the `execute` method.  The added statements and Tables will be cleared when calling the `execute` method.

interface TableEnvironment {

...

}

interface DmlBatch {

...

   

  /** 
   * execute all statements and Tables as a batch.
   * 
   * The added statements and Tables will be cleared when  this method. 
   */
   ResultTable execute() throws Exception;

   

   /** 
    * returns the AST and the execution plan to compute the result of the all statements and Tables.
    * 
    * @param extended if the plan should contain additional properties. e.g. estimated cost, traits
    */
    String explain(boolean extended);

}


  1. Code Block
    languagejava
    titleResultKind
    public enum ResultKind {
        // for DDL, DCL and statements with a simple "OK" 
    	SUCCESS,
    
        // rows with important content are available (DML, DQL) 
        SUCCESS_WITH_CONTENT
    }


    Code Block
    languagejava
    titleStatementSet
    interface StatementSet  {
        // add single INSERT statement into the set
        StatementSet addInsertSql(String statement);
    
        // add Table with insert operation into the set
        StatementSet addInsert(String targetPath, Table table);
    
        // add Table with insert operation and overwrite option into set
    	StatementSet addInsert(String targetPath, Table table, boolean overwrite);
    
        // returns the AST and the execution plan to compute 
        // the result of all statements and Tables
        String explain(ExplainDetail... extraDetails);
    
        // execute all statements and Tables as a batch
        TableResult execute() throws Exception;
    }


    Code Block
    languagejava
    titleExplainDetail
    public enum ExplainDetail {
       STATE_SIZE_ESTIMATE,
       UID,
       HINTS,
       ...
    }


  2. For current messy Flink table program trigger point, we propose that: for TableEnvironment and StreamTableEnvironment, you must use `TableEnvironment.execute()` to trigger table program execution, once you convert the table program to a DataStream program (through `toAppendStream` or `toRetractStream` method), you must use `StreamExecutionEnvironment.execute` to trigger the DataStream program.
    Similar rule for BatchTableEnvironment, you must use `TableEnvironment.execute()` to trigger batch table program execution, once you convert the table program (through `toDataSet` method) to a DataSet program, you must use `ExecutionEnvironment.execute` to trigger the DataSet program.

Proposed Changes

`TableEnvironment.sqlUpdate(String)`

Now `void sqlUpdate(String sql)` method will execute DDLs right now, while DMLs will be buffered and be triggered by `TableEnvironment.execute()`. Both behaviors should be kept consistent. So this method will be deprecated. We propose a new blocking method with execution result:

Code Block
languagejava
titleNew methods in TableEnvironment
interface TableEnvironment {
     /**
      * Synchronously execute the given single statement immediately and 
      * the statement can be DDL/DML/SHOW/DESCRIBE/EXPLAIN/USE. 
      * If the statement is translated to a Flink job, the result will be 
      * returned until the job is finished.
      * 
      * @return result for SHOW/DESCRIBE/EXPLAIN, the affected row count 
      * for `DML` (-1 means unknown), or a string message ("OK") for other  
      * statements.
      * @throws Exception which occurs during the execution.
      */
	TableResult executeSql(String statement) throw Exception;
}


This method only supports executing a single statement which can be DDL, DML, SHOW, DESCRIBE, EXPLAIN and USE statement. This method will be executed synchronously and return a
TableResult which is the representation of the execution result, and contains the result data and the result schema. If an error occurs, this method will throw an exception.

Code Block
languagejava
titleTableResult
/**
 * A TableResult is the representation of the statement execution result.
 */
interface TableResult {
   /** 
    * Get the schema of result. 
    */
	TableSchema getTableSchema();
    
    /**
     * return the ResultKind which can avoid custom parsing of
     * an "OK" row in programming
     */
    ResultKind getResultKind();

     /**
      * Get the result contents as an iterable rows.
      */
    Iterable<Row> collect();

    /**
     * Print the result contents.
     */
    void print();
}


Code Block
languagejava
titleResultKind
/**
 * ResultKind defines the types of the result.
 */
public enum ResultKind {
    // for DDL, DCL and statements with a simple "OK" 
	SUCCESS,

    // rows with important content are available (DML, DQL) 
    SUCCESS_WITH_CONTENT
}



The following table describes the result for each kind of statement:

Statement

Result Schema

Result Value

Result Kind

Examples

DDL

field name: result

field type: VARCHAR(2)

"OK"

(single row)

SUCCESS

CREATE TABLE new_table (col1 BIGINT, ...)

DML

(INSERT/UPDATE/DELETE)

field name: affected_rowcount

field type: BIGINT

the affected row count

(-1 means unknown)

SUCCESS_WITH_CONTENT

INSERT INTO sink_table SELECT …  

SHOW xx

field name: result

field type: VARCHAR(n)

(n is the max length of values)

list all objects

(multiple rows)

SUCCESS_WITH_CONTENT

SHOW CATALOGS

DESCRIBE xx

describe the detail of an object 

(single row)

DESCRIBE CATALOG catalog_name

EXPLAIN xx

explain the plan of a query

(single row)

EXPLAIN PLAN FOR SELECT …

USE xx

field name: result

field type: VARCHAR(2)

"OK"

(single row)

SUCCESS

USE CATALOG catalog_name


`TableEnvironment.insertInto(String, Table)`  & & `Table.insertInto(String)`

Like the `sqlUpdate` method,  `TableEnvironment.insertInto(String, Table)` and `Table.insertInto(String)` also buffter the Tables, and will cause the buffer problem. So these two methods will be deprecated.

`TableEnvironment.execute(String)` & `TableEnvironment.explain(boolean)`

Since we will disable buffering SQLs/Tables and plans, it’s meaningless to provide `execute(String)` as the trigger entry point and explain(boolean) method should also not be used anymore. So we advise deprecating those two methods. Instead, we introduce a new method named `createStatementSet` and a new class named `StatementSet` to support multiple SQLs/Tables optimization. Only DML statements or Tables can be added to StatementSet. For DML, only `INSERT` is supported now, DELETE and UPDATE can also be supported in the future.

StatementSet supports adding a list of DMLs and Tables through the `addXX` methods, getting the plan of all statements and Tables through the `explain` method, optimizing the whole statements and Tables and submitting the job through the `execute` method.  The added statements and Tables will be cleared when calling the `execute` method.

Code Block
languagejava
titleNew methods in TableEnvironment
interface TableEnvironment {
   /** 
    * Create a StatementSet instance which can add DML statements or Tables 
    * to the set, the planner can optimize all added statements and Tables 
    * together for better performance.
    */
	TableResult executeSql(String statement) throw Exception;
}


Code Block
languagejava
titleStatementSet
interface StatementSet  {
    /** 
     * add insert statement to the set.
     */
    StatementSet addInsertSql(String statement);

    /** 
     * add Table with the given sink table name to the set.
     */
    StatementSet addInsert(String targetPath, Table table);

    /** 
     * add Table with the given sink table name to the set.
     */
	StatementSet addInsert(String targetPath, Table table, boolean overwrite);

    /** 
     * returns the AST and the execution plan to compute the result of the 
     * all statements and Tables.
     * 
     * @param extraDetails the extra details which the plan should contain. 
     * e.g. estimated cost, uid
     */
    String explain(ExplainDetail... extraDetails);

    /** 
     * execute all statements and Tables as a batch.
     * 
     * The added statements and Tables will be cleared when executing
     * this method. 
     */
    TableResult execute() throws Exception;
}


Code Block
languagejava
titleExplainDetail
/**
 * ExplainDetail defines the types of details for explain result
 */
public enum ExplainDetail {
   STATE_SIZE_ESTIMATE,
   UID,
   HINTS,
   ...
}


Each statement or Table has a return value which is the affected row count of a statement or a Table. So the TableResult has multiple columns. All column types are BIGINT, and the column name is "affected_rowcount_" plus the index of the statement or Table. e.g.


Code Block
languagejava
titleExample
StatementSet stmtSet = tEnv.createStatementSet();
stmtSet.addInsertSql("insert into xx ...");
stmtSet.addInsert("yy", tEnv.sqlQuery("select ..."));
stmtSet.execute("test")


The schema and data in ResultTable: 


column1 (insert into xx ... )

column2 (stmtSet.addInsert("yy", tEnv.sqlQuery("select ...")))


Schema

name: affected_rowcount_0

type: BIGINT

name: affected_rowcount_1

type: BIGINT

Data (single row)

-1

-1


`TableEnvironment.fromTableSource(TableSource<?>)`

Since Flip-64 has provided `ConnectTableDescriptor#createTemporaryTable` to register TableSource in TableEnvironment. This method should be deprecated too, it’s an omission in that flip.

Other new proposed methods

Currently, we can’t explain a statement directly in TableEnvironment, we must convert a statement to a Table through `TableEnvironment.sqlQuery` method. Meanwhile, we can’t explain a INSERT statement, because we can’t convert an INSERT statement to a Table. We introduce `TableEnvironment.explainSql()` method to support explaining DQL and DML statements directly. The `explainSql` method only accepts single statement.

Code Block
languagejava
titleNew methods in TableEnvironment
interface TableEnvironment {
   /** 
    * returns the AST and the execution plan to compute the result of 
    * the given statement. 
    * The statement must be DQL or DML, and only single statement is 
    * supported.
    * 
    * @param extraDetails the extra details which the plan should contain. 
    * e.g. estimated cost, uid
    */
    String explainSql(String statement, ExplainDetail... extraDetails);
}

We also introduce the following methods to make the programming more fluent on Table.

Code Block
languagejava
titleNew methods in Table
interface Table {
    /** 
     * Synchronously write the Table to a TableSink that was registered 
     * under the specified path.
     * 
     * @param tablePath The path of the registered TableSink to which 
     * the Table is written.
     */
	TableResult executeInsert(String tablePath);
    
    /** 
     * Synchronously write the Table to a TableSink that was registered 
     * under the specified path.
     * 
     * @param tablePath The path of the registered TableSink to which 
     * the Table is written.
     * @param overwrite Whether overwrite the existing data
     */
    TableResult executeInsert(String tablePath, boolean overwrite);

    /** 
     * Returns the AST and the execution plan to compute the result of 
     * the current Table.
     * 
     * @param extraDetails the extra details which the plan should contain. 
     * e.g. estimated cost, uid
     */
    String explain(ExplainDetail... details);

    /** 
     * Get the contents of the current table.
     */
    TableResult execute() throw Exception;
}


How to correct the execution behavior?

First, let’s discuss the buffer problem in depth. Actually there are two levels of buffer, TableEnvironment will buffer SQLs/Tables and StreamExecutionEnvironment will buffer transformations to generate StreamGraph. Each TableEnvironment instance holds a StreamExecutionEnvironment instance. Currently, when translating a FlinkRelNode into a Flink operator, the generated transformations will be added to StreamExecutionEnvironment’s buffer. The bug[2] is caused by this behavior. Let’s give another simple example to explain the problem of StreamExecutionEnvironment’s buffer.

Code Block
languagejava
titleExample
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// will add transformations to env when translating to execution plan
tEnv.sqlUpdate("INSERT INTO sink1 SELECT a, b FROM MyTable1")

Table table = tEnv.sqlQuery("SELECT c, d from MyTable2")
DataStream dataStream = tEnv.toAppendStream(table, Row.class)
dataStream…

env.execute("job name") ;
// or tEnv.execute("job name") 


The job submitted by each execute method contains the topology of both queries. Users are confused about the behavior. As suggested in "Public Interfaces",`StreamExecutionEnvironment.execute` only triggers DataStream program execution, and `TableEnvironment.execute` only triggers table program execution. So the expected behavior for the above example is `env.execute("job name")` submits the second query, and `tEnv.execute("job name") ` submits the first query.  

To meet the requirement, we will change the current behavior of TableEnvironment: TableEnvironment instance buffers the SQLs/Tables and does not add generated transformations to the StreamExecutionEnvironment instance when translating to execution plan. The solution is similar to DummyStreamExecutionEnvironment. We can use StreamGraphGenerator to generate StreamGraph based on the transformations. This requires the StreamTableSink always returns DataStream, and the StreamTableSink.emitDataStream method should be removed since it’s deprecated in Flink 1.9. StreamExecutionEnvironment instance only buffers the transformation translated from DataStream.
The solution for BatchTableEnvironment is similar to StreamExecutionEnvironment, `BatchTableSink.emitDataSet` method should return DataSink, and DataSet plan can be created through a plan generator based on the DataSinks. ExecutionEnvironment instance only buffers the DataSink translated from DataSet.

Now, we introduce `StatementSet` to require users to explicitly buffer SQLs/Tables to support multiple sinks optimization. Although the `insertInto`, `sqlUpdate` and `execute` methods are deprecated, they will not be immediately deleted, so the deprecated methods and new methods must work together in one or more versions. The TableEnvironment’s buffer will be removed once the deprecated methods are deleted. 

After we correct the behavior of the `execute` method, users can easily and correctly write the table program even if the deprecated methods, the new methods and the `to DataStream` methods are mixed used.

Examples:

We will list some examples using old API and proposed API to have a straightforward comparison in this section.

`sqlUpdate` vs `executeSql`:

Current Interface

New Interface

tEnv.sqlUpdate("CREATE TABLE test (...) with (path = '/tmp1')");

ResultTable result = tEnv.executeStatement("CREATE TABLE test (...) with (path = '/tmp1')");

result...

tEnv.sqlUpdate("INSERT INTO test SELECT ...");

tEnv.execute("test");

ResultTable result = tEnv.executeStatement("INSERT INTO test SELECT ...");

result...


`execute & explain` & vs `createStatementSet`:

Current Interface

New Interface

tEnv.sqlUpdate("insert into xx ...")

tEnv.sqlUpdate("insert into yy ...")

tEnv.execute("test")

// tEnv.explain(false)

StatementSet stmtSet = tEnv.createStatementSet();

stmtSet.addInsertSql("insert into xx ...");

stmtSet.addInsertSql("insert into yy ...");

TableResult result = stmtSet.execute();

// stmtSet.explain()

Table table1 = tEnv.sqlQuery("select xx ...")...

Table table2 = tEnv.sqlQuery("select yy ...")...

tEnv.insertInto("sink1", table1)

tEnv.insertInto("sink2", table2)

tEnv.execute("test")

// tEnv.explain(false)

Table table1 = tEnv.sqlQuery("select xx ...")...

Table table2 = tEnv.sqlQuery("select yy ...")...

StatementSet stmtSet = tEnv.createStatementSet();

stmtSet.addInsert("sink1", table1);

stmtSet.addInsert("sink2", table2);

TableResult result = stmtSet.execute()

// stmtSet.explain()


Other new proposed methods

Code Block
languagejava
titleExample
TableEnvironment tEnv = ...
tEnv.explainSql("insert into s1 ...")
tEnv.explainSql("select xx ...")

Table table1 = tEnv.sqlQuery("select xx ...")...
String explanation = table1.explain();
TableResult result = table1.executeInsert("sink1");

Table table2 = tEnv.sqlQuery("select yy ...")...
TableResult result = table2.execute();
result.print();


Deprecated methods and new methods work together

Code Block
languagejava
titleExample
TableEnvironment tEnv = ...
StatementSet stmtSet = tEnv.createStatementSet();

tEnv.sqlUpdate("insert into s1 ..."); // statement1

stmtSet.addInsertSql("insert into s2 ..."); // statement2

tEnv.insertInto("sink1",  tEnv.sqlQuery("select xx...")); // statement3

tEnv.executeSql("insert into s3 ..."); // only submit the plan of this statement 

tEnv.explain(false); // explain the plan of statement1 and statement3
tEnv.execute( "test1"); // submit the plan of statement1 and statement3

stmtSet.addInsert("sink2", tEnv.sqlQuery("select yy...")); // statement4

stmtSet.explain(); // explain the plan of statement2 and statement4
TableResult result = stmtSet.execute(); // submit the plan of statement2 and statement4


TableEnvironment’s methods and DataStream work together

Code Block
languagejava
titleExample
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.sqlUpdate("insert into s1 ..."); // statement1

StatementSet stmtSet = tEnv.createStatement();

stmtSet.addInsert("sink1", tEnv.sqlQuery("select xx...")); // statement2

Table table =  tEnv.sqlQuery("select yy..."); 
DataStream dataStream = tEnv.toAppendStream(table, Row.class); // statement3
dataStream…

tEnv.explain(false); // explain the plan of statemen1

stmtSet.explain(); // explain the plan of statemen2

env.execute("test1") ;  // submit the plan of statement3

tEnv.execute("test2") ;  // submit the plan of statement1

stmtSet.execute(); // submit the plan of statement2 


Summary:

Methods in TableEnvironment & Table


Methods

Comments

TableEnvironment

JobExecutionResult

Each statement or Table has a return value which is the affected row count of a statement or a Table. So the ResultTable has multiple columns. All column types are BIGINT, and the column name is "affected_rowcount_" plus the index of the statement or Table. e.g. 

...

DmlBatch batch = tEnv.createDmlBatch();

batch.addInsert("insert into xx ...");

batch.addInsert("yy", tEnv.sqlQuery("select ..."));

batch.execute("test")

The schema and data in ResultTable: 

...

column1 (insert into xx ... )

...

column2 (batch.addInsert("yy", tEnv.sqlQuery("select ...")))

Schema

...

name: affected_rowcount_0

type: BIGINT

...

name: affected_rowcount_1

type: BIGINT

...

Data (single row)

...

-1

...

-1

`Table fromTableSource(TableSource<?> source)`

Since Flip-64 has provided `ConnectTableDescriptor#createTemporaryTable` to register TableSource in TableEnvironment. This method should be deprecated too, it’s an omission in that flip.

How to correct the execution behavior?

First, let’s discuss the buffer problem in depth. Actually there are two levels of buffer, TableEnvironment will buffer SQLs/Tables and StreamExecutionEnvironment will buffer transformations to generate StreamGraph. Each TableEnvironment instance holds a StreamExecutionEnvironment instance. Currently, when translating a FlinkRelNode into a Flink operator, the generated transformations will be added to StreamExecutionEnvironment’s buffer. The bug[2] is caused by this behavior. Let’s give another simple example to explain the problem of StreamExecutionEnvironment’s buffer.

...

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// will add transformations to env when translating to execution plan

tEnv.sqlUpdate("INSERT INTO sink1 SELECT a, b FROM MyTable1")

Table table = tEnv.sqlQuery("SELECT c, d from MyTable2")

DataStream dataStream = tEnv.toAppendStream(table, Row.class)

dataStream…

env.execute("job name") ;

// or tEnv.execute("job name") 

The job submitted by each execute method contains the topology of both queries. Users are confused about the behavior. As suggested in "Public Interfaces",`StreamExecutionEnvironment.execute` only triggers DataStream program execution, and `TableEnvironment.execute` only triggers table program execution. So the expected behavior for the above example is `env.execute("job name")` submits the second query, and `tEnv.execute("job name") ` submits the first query. 

To meet the requirement, we will change the current behavior of TableEnvironment: TableEnvironment instance buffers the SQLs/Tables and does not add generated transformations to the StreamExecutionEnvironment instance when translating to execution plan. (The solution is similar to DummyStreamExecutionEnvironment. We can use StreamGraphGenerator to generate StreamGraph based on the transformations. This requires the StreamTableSink always returns DataStream, and the StreamTableSink#emitDataStream method should be removed since it’s deprecated in Flink 1.9) StreamExecutionEnvironment instance only buffers the transformation translated from DataStream.

Now, we introduce `DmlBatch` to require users to explicitly buffer SQLs/Tables to support multiple sinks optimization. Although the `insertInto`, `sqlUpdate` and `execute` methods are deprecated, they will not be immediately deleted, so the deprecated methods and new methods must work together in one or more versions. The TableEnvironment’s buffer will be removed once the deprecated methods are deleted. 

After we correct the behavior of the `execute` method, users can easily and correctly write the table program even if the deprecated methods, the new methods and the `to DataStream` methods are mixed used.

Examples:

We will list some examples using old API and proposed API to have a straightforward comparison in this section.

`sqlUpdate` vs `executeStatement`:

...

Current Interface

...

New Interface

...

tEnv.sqlUpdate("CREATE TABLE test (...) with (path = '/tmp1')");

...

ResultTable result = tEnv.executeStatement("CREATE TABLE test (...) with (path = '/tmp1')");

result...

...

tEnv.sqlUpdate("INSERT INTO test SELECT ...");

tEnv.execute("test");

...

ResultTable result = tEnv.executeStatement("INSERT INTO test SELECT ...");

result...

`execute & explain` & vs `createDmlBatch`:

...

Current Interface

...

New Interface

...

tEnv.sqlUpdate("insert into xx ...")

tEnv.sqlUpdate("insert into yy ...")

tEnv.execute("test")

// tEnv.explain(false)

...

DmlBatch batch = tEnv.createDmlBatch();

batch.addInsert("insert into xx ...");

batch.addInsert("insert into yy ...");

ResultTable result = batch.execute();

// batch.explain(false)

...

Table table1 = tEnv.sqlQuery("select xx ...")...

Table table2 = tEnv.sqlQuery("select yy ...")...

tEnv.insertInto("sink1", table1)

tEnv.insertInto("sink2", table2)

tEnv.execute("test")

// tEnv.explain(false)

...

Table table1 = tEnv.sqlQuery("select xx ...")...

Table table2 = tEnv.sqlQuery("select yy ...")...

DmlBatch batch = tEnv.createDmlBatch();

batch.addInsert("sink1", table1);

batch.addInsert("sink2", table2);

ResultTable result = batch.execute()

// batch.explain(false)

Deprecated methods and new methods work together

...

TableEnvironment tEnv = ...

DmlBatch batch = tEnv.createDmlBatch();

tEnv.sqlUpdate("insert into s1 ..."); // statement1

batch.addInsert("insert into s2 ..."); // statement2

tEnv.insertInto("sink1",  tEnv.sqlQuery("select xx...")); // statement3

tEnv.executeStatement("insert into s3 ..."); // only submit the plan of this statement 

tEnv.explain(false); // explain the plan of statement1 and statement3

tEnv.execute( "test1"); // submit the plan of statement1 and statement3

batch.addInsert("sink2", tEnv.sqlQuery("select yy...")); // statement4

batch.explain(false); // explain the plan of statement2 and statement4

ResultTable result = batch.execute(); // submit the plan of statement2 and statement4

TableEnvironment’s methods and DataStream work together

...

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.sqlUpdate("insert into s1 ..."); // statement1

DmlBatch batch = tEnv.createDmlBatch();

batch.addInsert("sink1", tEnv.sqlQuery("select xx...")); // statement2

Table table =  tEnv.sqlQuery("select yy..."); 

DataStream dataStream = tEnv.toAppendStream(table, Row.class); // statement3

dataStream…

tEnv.explain(false); // explain the plan of statemen1

batch.explain(false); // explain the plan of statemen2

env.execute("test1") ;  // submit the plan of statement3

tEnv.execute("test2") ;  // submit the plan of statement1

batch.execute(); // submit the plan of statement2 

Summary:

Methods of TableEnvironment

void sqlUpdate(String sql

Methods

Comments

void execute(String jobName)

deprecated

String explain(boolean extended)

deprecated

)

deprecated

void sqlUpdate(String sql)

deprecated

void insertInto(String, Table)

deprecated

Table fromTableSource(TableSource tableSource)

deprecated

TableResult executeSql(String statement)

added

String explainSql(String, ExplainDetail... extraDetails)

added

StatementSet createStatementSet()

added

Table

insertInto(String tablePath

)

deprecated

void insertIntoTableResult executeInsert(String , TabletablePath)

deprecatedadded

fromTableSource(TableSource tableSouceTableResult executeInsert(String tablePath, boolean overwrite)

deprecatedadded

ResultTable executeStatement(String statementString explain(ExplainDetail... extraDetails)

added

DmlBatch createDmlBatchTableResult execute()

added


New methods for single statement & multiple statements


single statement

multiple statements

DDL

executeStatementexecuteSql(String)

Unsupported (supports multiple DDLs for easy testing in the future)

SHOW/DESCRIBE/USE

executeStatementexecuteSql(String)Unsupported
DMLexecuteStatementexecuteSql(String)createDmlBatch

createStatementSet() ->

DmlBatch

StatementSet -> execute()

EXPLAIN

explain(Table) 

explainSql(can’t explain insert statementString)

createDmlBatchcreateStatementSet() -> DmlBatch StatementSet -> explain()


Compatibility, Deprecation, and Migration Plan

  1. Methods of TableEnvironment to be deprecated:
    • void sqlUpdate(String sql)
    • void insertInto(String targetPath, Table table)
    • JobExecutionResult execute(String jobName)
    • String explain(boolean extended)Table fromTableSource(TableSource tableSourceextended)
    • Table fromTableSource(TableSource tableSource)
  2. Methods in Table to be deprecated:
    • void insertInto(String targetPath)
  3. You need to change to the your program a little if you use `StreamExecutionEnvironmentuse `StreamExecutionEnvironment/ExecutionEnvironment.execute` to trigger a table program execution or use `StreamTableEnvironmentuse `StreamTableEnvironment.execute()` to trigger a DataStream program execution, use `BatchTableEnvironment.execute()` to trigger a DataSet program execution.

Test Plan

The `DmlBatch#explain` `SatementSet#explain` method can be tested with unit tests, and other new methods can be tested with integration tests. We will also add some integration tests to verify the new methods can work with the deprecated methods correctly.

...

To support execute time-cost batch sql or no-end streaming sql, it’s needed to provide an asynchronous execution way. 

Provide async execute method

...

for executeSql and

...

StatementSet.execute

Similarly as above, suggested methods:

Code Block
languagejava
titleNew methods in TableEnvironment
interface TableEnvironment {
   /** 

...


    * Asynchronously execute the given single statement and the

...

 statement can be DDL/DML/SHOW/DESCRIBE/EXPLAIN/USE. 

...


    */

...


    CompletableFuture<ResultTable> executeSqlAsync(String statement);
}


We also should support executing batch sql asynchronously. Suggested method:

interface DmlBatch {

Code Block
languagejava
titleStatementSet
interface StatementSet {
   

...

/** 

...


    * Asynchronously execute the dml statements as a batch

...


    */

...


   CompletableFuture<ResultTable> executeAsync();

...


}


Add an async execute method to org.apache.flink.table.delegation.Executor

Code Block
languagejava
titleStatementSet
/**

...


 * Executes all the previously applied transformations via {@link #apply(List)} in an async way.

...


 * @return

...


 * @throws Exception

...


 */

...


CompletableFuture<JobClient> executeAsync(String jobName)

...

Since we already have flips[7][8] to provide asynchronous management, it’s convenient and natural to provide such a method.

Add an async execute method to org.apache.flink.streaming.api.environment#StreamExecutionEnvironment

...

 throws Exception;


SQL CLI integrates with new API

...