Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Now `void sqlUpdate(String sql)` method will execute DDLs right now while DMLs must be triggered by `TableEnvironment.execute()`. Both behaviors should be kept consistent. This method will buffer the `INSERT` statement which causes the above problem. So this method will be deprecated. We propose a new blocking method with execution result:

/**
 * Synchronously execute the given single statement immediately and the  * statement can be DDL/DML/SHOW/DESCRIBE/EXPLAIN/USE. 

 * If the statement is translated to a Flink job, the result will be  * returned until the job is finished.

 * 

 * @return result for SHOW/DESCRIBE/EXPLAIN, the affected row count for  * `DML` (-1 means unknown), or a string message ("OK") for other   * statements.

 * @throws Exception which occurs during the execution.
 */
ResultTable executeStatement(String statement) throws Exception;

This method only supports executing a single statement which can be DDL, DML, SHOW, DESCRIBE, EXPLAIN and USE statement. This method will be executed synchronously and return a ResultTable which is the representation of the execution result, and contains the result data and the result schema. If an error occurs, this method will throw an exception.


/** 

 * A  A ResultTable is the representation of the statement execution  * result.

 */
public interface ResultTable {


  /** 

   * Get the schema of ResultTable. 

   */
  TableSchema getResultSchema();

  /**

   * Get the result contents as an iterable rows. 

      */
  Iterable<Row> getResultRows();
}


The following table describes the result for each kind of statement:

Statement

Result Scheam

Result Value

Examples

DDL

field name: result

field type: VARCHAR(2)

"OK"

(single row)

CREATE TABLE new_table (col1 BIGINT, ...)

DML

(INSERT/UPDATE/DELETE)

field name: affected_rowcount

field type: BIGINT

the affected row count

(-1 means unknown)

INSERT INTO sink_table SELECT …  

SHOW xx

field name: result

field type: VARCHAR(n)

(n is the max length of values)

list all objects

(multiple rows)

SHOW CATALOGS

DESCRIBE xx

describe the detail of an object 

(single row)

DESCRIBE CATALOG catalog_name

EXPLAIN xx

explain the plan of a query

(single row)

EXPLAIN PLAN FOR SELECT …

USE xx

field name: result

field type: VARCHAR(2)

"OK"

(single row)

USE CATALOG catalog_name


`insertInto(String, Table)` 

Like the `INSERT` statement, the Tables passed to this method will also be buffered and will cause the buffer problem. So we advise deprecating this method.

 `Table.insertInto` will use this deprecated method now. Once this method is removed in the future, we will change the behavior of `Table.insertInto` method from lazy execution (triggered by `TableEnvironment.execute` method) to immediate execution (like `executeStatement` method). 

`execute(String jobName)` & `explain(boolean)`

...

 `DmlBatch` supports adding a list of SQLs and Tables through the `addXX` methods, getting the plan of all statements through the `explain` method, optimizing the whole statements and submitting the job through the `execute` method.  The added statements and Tables will be cleared when calling the `execute` method.

interface TableEnvironment {

 /** 

  * Create a DmlBatch instance which can add dml statements or Tables

...

to the batch, the planner can optimize all added statements and

...

Tables together for better performance.

  */

    DmlBatch createDmlBatch();

}


interface DmlBatch {

  /** 

...

   * add insert statement to the batch.

...

   */

    void addInsert(String insert);


  /** 

...

   * add Table with the given sink table name to the batch. 

...

   */

    void addInsert(String targetPath, Table table);

   

  /** 

   * execute all statements and Tables as a batch.

   * 

   * The added statements and Tables will be cleared when executing

   * this method. 

   */

  ResultTable execute() throws Exception;

   

   /** 

   * returns the AST and the execution plan to compute the result of the 

   * all statements and Tables.

   * 

   * @param extended if the plan should contain additional properties.

   * e.g. estimated cost, traits

   */

  String explain(boolean extended);

}


Each statement or Table has a return value which is the affected row count of a statement or a Table. So the ResultTable has multiple columns. All column types are BIGINT, and the column name is "affected_rowcount_" plus the index of the statement or Table. e.g. 

DmlBatch batch = tEnv.createDmlBatch();

batch.addInsert("insert into xx ...");

batch.addInsert("yy", tEnv.sqlQuery("select ..."));

batch.execute("test")


The schema and data in ResultTable: 

...

First, let’s discuss the buffer problem in depth. Actually there are two levels of buffer, TableEnvironment will buffer SQLs/Tables and StreamExecutionEnvironment will buffer transformations to generate StreamGraph. Each TableEnvironment instance holds a StreamExecutionEnvironment instance. Currently, when translating a FlinkRelNode into a Flink operator, the generated transformations will be added to StreamExecutionEnvironment’s buffer. The bug[2] is caused by this behavior. Let’s give another simple example to explain the problem of StreamExecutionEnvironment’s buffer.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// will add transformations to env when translating to execution plan

tEnv.sqlUpdate("INSERT INTO sink1 SELECT a, b FROM MyTable1")

Table table = tEnv.sqlQuery("SELECT c, d from MyTable2")

DataStream dataStream = tEnv.toAppendStream(table, Row.class)

dataStream…

env.execute("job name") ;

// or tEnv.execute("job name") 


The job submitted by each execute method contains the topology of both queries. Users are confused about the behavior. As suggested in "Public Interfaces",`StreamExecutionEnvironment.execute` only triggers DataStream program execution, and `TableEnvironment.execute` only triggers table program execution. So the expected behavior for the above example is `env.execute("job name")` submits the second query, and `tEnv.execute("job name") ` submits the first query. 

...

Deprecated methods and new methods work together

TableEnvironment tEnv = ...

DmlBatch batch = tEnv.createDmlBatch();

tEnv.sqlUpdate("insert into s1 ..."); // statement1

batch.addInsert("insert into s2 ..."); // statement2

tEnv.insertInto("sink1",  tEnv.sqlQuery("select xx...")); // statement3

tEnv.executeStatement("insert into s3 ..."); // only submit the plan of this statement 

tEnv.explain(false); // explain the plan of statement1 and statement3

tEnv.execute( "test1"); // submit the plan of statement1 and statement3

batch.addInsert("sink2", tEnv.sqlQuery("select yy...")); // statement4

batch.explain(false); // explain the plan of statement2 and statement4

ResultTable result = batch.execute(); // submit the plan of statement2 and statement4


TableEnvironment’s methods and DataStream work together

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.sqlUpdate("insert into s1 ..."); // statement1

DmlBatch batch = tEnv.createDmlBatch();

batch.addInsert("sink1", tEnv.sqlQuery("select xx...")); // statement2

Table table =  tEnv.sqlQuery("select yy..."); 

DataStream dataStream = tEnv.toAppendStream(table, Row.class); // statement3

dataStream…

tEnv.explain(false); // explain the plan of statemen1

batch.explain(false); // explain the plan of statemen2

env.execute("test1") ;  // submit the plan of statement3

tEnv.execute("test2") ;  // submit the plan of statement1

batch.execute(); // submit the plan of statement2 


Summary:

Methods of TableEnvironment

...

 

single statement

multiple statements

DDL

executeStatement()

Unsupported (supports multiple DDLs for easy testing in the future)

SHOW

/DESCRIBE

/USE

executeStatement()Unsupported
DMLexecuteStatement()createDmlBatch() -> DmlBatch -> execute()

EXPLAIN

explain(Table) 

(can’t explain insert statement)

createDmlBatch() -> DmlBatch -> explain()

...

Notice: depends on FLIP-73/FLIP-74, not the scope of this flip.

To support execute time-cost batch sql or no-end streaming sql, it’s needed to provide an asynchronous execute execution way. 

Provide async execute method for

...

executeStatement and

...

DmlBatch.execute

Similarly as above, suggested methods:

/*** Asynchronously  Asynchronously execute the given single statement immediately and the * statement can be ddl/dml/`select`. A statement separated by semicolon *is not supported.
 can be DDL/DML/SHOW/DESCRIBE/EXPLAIN/USE. */
CompletableFuture<ResultTable> executeStatementAsync(String statement);

...

We also should support execute batch sql asynchronous. Suggested method:

interface DmlBatch {

 /** 

  * Asynchronously execute the dml statements as a batch

  */

 CompletableFuture<ResultTable> executeAsync();

}


Add an async execute method to org.apache.flink.table.delegation.Executor

...