Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state[Under Discussion"]

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Motivation


TableEnvironment has provided two `Table sqlQuery(String sql)` and In Flink 1.9, TableEnvironment introduces `void execute(String jobName)` interface to trigger the Flink table program execution, and extends `void sqlUpdate(String sql)` interfaces to create a table(actually a view here) or describe an update action from one sql stringinterface to evaluates not only a INSERT statement but also a DDL statement and a USE statement. But with more use cases come coming up, there are some fatal shortcomings in current API design.

  1. Inconsistent execution semantics for `sqlUpdate()`. For now, one DDL statement passed to this method will be executed immediately while one `INSERT INTO` statement actually gets executed when we call the `execute()` method, which confuses users a lot.
  2. Don’t support obtaining the returned value from sql execute. The FLIP-69[1] introduces a lot of common DDLs such as `SHOW TABLES`, which require that TableEnvironment can have an interface to obtain the executed result of one DDL. SQL CLI also has a strong demand for this feature so that we can easily unify the execute way of SQL CLI and TableEnvironemt. Besides, the method name `sqlUpdate` is not consistent with doing things like `SHOW TABLES`.
  3. Unclear and buggy support buffering SQLs/Tables execution[2]. Blink planner has provided the ability to optimize multiple sinks, but we don’t have a clear mechanism through TableEnvironment API to control the whole flow.
  4. Unclear Flink table program trigger point. Both `TableEnvironment.execute()` and `StreamExecutionEnvironment.execute` can trigger a Flink table program execution. However if you use TableEnvironment to build a Flink table program, you must use `TableEnvironment.execute()` to trigger execution, because you can’t get the StreamExecutionEnvironment instance. If you use StreamTableEnvironment to build a Flink table program, you can use both to trigger execution. If you convert a table program to a DataStream program (using StreamExecutionEnvironment.toAppendStream), you must use `StreamExecutionEnvironment.execute`. So it’s hard to explain which `execute` method should be used.
  5. Don’t support executing Flink table jobs
  6. `Table sqlQuery(String sql)` actually returns a temporal view from one select sql and must be registered again before using it in the following sql. According to FLIP-64[3], it’s natural to deprecate `Table sqlQuery(String sql)` and provided a new method `void createTemporalView(String path, String sql)` in TableEnvironment.
  7. Inconsistent execute semantics for `sqlUpdate()`. For now, one ddl passed to this method will be executed immediately while one `insert into` sql actually gets executed when we call `execute()` method, which confuses users a lot.
  8. Don’t support obtain returned value from sql execute. The FLIP-69[1] introduces a lot of common DDLs such as `show tables`, which require TableEnvironment can have an interface to obtain the executed result of one ddl. SQL CLI also has a strong demand for this feature so that we can easily unify the execute way of SQL CLI and TableEnvironemt. Besides, the method name `sqlUpdate` is not consistent with doing things like ‘show tables’.
  9. Unclear and buggy support for buffer sql execution[2]. Blink planner has provided the ability to optimize multiple sql, but we don’t have a clear mechanism through TableEnvironment API to control the whole flow.
  10. Don’t have the ability to deal with multiple statements. It’s a very usual scene that SQL CLI user to execute an outside sql script file. If TableEnvironemnt doesn’t want to support multiple statements, it or sql parser should expose the ability to parse multiple statements to outside, so that SQL CLI can leverage it.
  11. Don’t support execute sql in an asynchronous way. In streaming mode, one `insert into `INSERT INTO xxx` sql statement may never end. It’s also possible that one ETL task takes too much time to be done in a batch environment. So it’s very natural and necessary to support execute sql executing Flink table jobs in an asynchronous way. 

...

Let’s give an example to explain the buffering SQLs/Tables execution problem

...

  • TableEnvironment shouldn’t buffer sqls/execution plans. For example, now the TableEnvironment support writes code like the following:

:

tEnv.sqlUpdate

tEnv.sqlUpdate

("CREATE TABLE test (...) with (path = '/tmp1')");
tEnv.sqlUpdate("INSERT INTO test SELECT ...");
tEnv.sqlUpdate("DROP TABLE test");
tEnv.sqlUpdate("CREATE TABLE test (...) with (path = '/tmp2')");
tEnv.execute()

  1. Users are confused by what kinds of sql are executed at once and what

...

  1. are buffered and what kinds of sql are buffered

...

  1. until triggered by the execute method.
  2. Buffering

...

  1. SQLs/Tables will cause behavior undefined. We may want to insert data into

...

  1. the `test` table with the `/tmp1` path but get the wrong result of `/tmp2`

...

  1. .

Public Interfaces

`execute(String jobName)` & `explain(boolean)`

...

  1. We propose to deprecate the following methods in TableEnvironment:

    • void sqlUpdate(String sql)

    • void insertInto(String targetPath, Table table)

    • void execute(String jobName)

...

    • String explain(boolean

...

    • extended)

    • Table fromTableSource(TableSource<?> source)

  1. meanwhile, we propose to introduce the following new methods in TableEnvironment:
    • ResultTable executeStatement(String statement)

      synchronously execute the given single statement immediately, and return the execution result.

      public interface ResultTable {
        TableSchema getResultSchema();
        Iterable<Row> getResultRows();
      }

 

    • DmlBatch createDmlBatch()

      create a DmlBatch instance which can add dml statements or Tables to the batch and explain or execute them as a batch.

      interface DmlBatch {

           void addInsert(String insert);

           void addInsert(String targetPath, Table table);

           ResultTable execute() throws Exception ;

         String explain(boolean extended);

      }

  1. For current messy Flink table program trigger point, we propose that: for TableEnvironment and StreamTableEnvironment, you must use `TableEnvironment.execute()` to trigger table program execution, once you convert the table program to a DataStream program (through `toAppendStream` or `toRetractStream` method), you must use `StreamExecutionEnvironment.execute` to trigger the DataStream program.

Proposed Changes

`void sqlUpdate(String sql)`

Now `void sqlUpdate(String sql)` method will execute DDLs right now while DMLs

...

`insertInto(String, Table)` 

Same reason as ditto. Here we advise when calling this method should trigger a job execute at once.

`Table sqlQuery(String sql)`

For now, it’s meaningless to write code just like `tEnv.sqlQuery(“select * from src“)` but don’t deal with the return value. And we can’t directly use the return table in the following sql only if you have registered it in some way. In fact, the returned Table of `sqlQuery(String sql)` is a temporary virtual view and we can treat it as the flip-64 does.

Also, sqlQuery provided a bridge from sql query to Table, but the name `sqlQuery` seems confusing on the behavior: do I really read something from the sql? To eliminate the ambiguity we suggest to provide a fromQuery method similar to `Table from(String path);` but obtain one table from one sql query stmt. Suggested methods:

...

/**

 * Get a Table from one sql query stmt, the sql must be a `select` stmt    

 * and one single statement.

 */

Table fromQuery(String sql)

`void sqlUpdate(String sql)`

Now `void sqlUpdate(String sql)` will execute DDLs right now while DMLs(such as `insert into xx`) must be triggered by `TableEnvironment.execute()`. Both behaviors should be kept consistent. So we propose a new blocking execute method with returned value to fetch execute the result and this method should be deprecated.Suggested methodsThis method will buffer the `INSERT` statement which causes the above problem. So this method will be deprecated. We propose a new blocking method with execution result:

/**
* Synchronously execute the given

sql

single statement immediately and

the sql can be

* ddl/dml/`select` but the sql argument must be a single statement.

* @return null for `ddl` and `insert into` sql stmt

*         ResultTable for `show xx` `describe xx` `select xx`
*/
ResultTable executeSql(String sql) throws Exception

Suggested ResultTable Interface:

the 

 * statement can be DDL/DML/SHOW/DESCRIBE/EXPLAIN/USE. 

 * If the statement is translated to a Flink job, the result will be 

 * returned until the job is finished.

 * 

 * @return result for SHOW/DESCRIBE/EXPLAIN, the affected row count for 

 * `DML` (-1 means unknown), or a string message ("OK") for other  

 * statements.

 * @throws Exception which occurs during the execution.
*/
ResultTable executeStatement(String statement) throws Exception;

This method only supports executing a single statement which can be DDL, DML, SHOW, DESCRIBE, EXPLAIN and USE statement. This method will be executed synchronously and return a ResultTable which is the representation of the execution result, and contains the result data and the result schema. If an error occurs, this method will throw an exception.


/**
* A ResultTable is the representation of the statement execution 

 * result.

 */
public interface ResultTable {

/**
* A ResultTable is the representation of one sql statement execute the

* result.
*
*/
public interface ResultTable {
  /**
  * Get the schmea of one ResultTable.
  * @return
  */
  TableSchema getSchema();
  /**
  * Get the resultTable contents as a iterable rows.
  * @return
  */
  Iterable<Row> getContents();
  /**
  * Prints the schema of this table to the console in a tree format.
  */
  void printSchema();


  /**
  *

Prints

Get the

content of this table to the console

schema of ResultTable.
  */
 

void show

TableSchema getResultSchema();

}

`Table fromTableSource(TableSource<?> source)`

Since Flip-64 has provided `ConnectTableDescriptor#createTemporaryTable` to register TableSource in TableEnvironment. This method should be deprecated too, maybe it’s an omission in that flip.

Support batch sql execute

Since blink planner support multiple SQLs optimize(such as multiple sinks optimize), we need to introduce the new method called `executeBatch` which take a list of SQLs as the input arguments and optimize all together and submit the job to execute. 

Notice, input SQLs can not be ddl or select sql similar to the JDBC statement[4]. We have such requirements for passed ddl will cause behavior unexpectedly. Suggested method:

/**
* Execute sqls in a batch and planner can optimize the whole batch to
* obtain better performance.
* Notice: sql statements are limited to `insert into`
*/
ResultTable executeBatch(String... sql) throws Exception

Discuss a parse method for multiple statements execute in SQL CLI

We don’t want to support multiple statements execute in the TableEnvironment but this feature is needed in the SQL CLI for it’s natural to execute an external script. I have thought provided a parse method like `SqlNodeList parse(String stmt)`, but it’s not intuitive to understand and this method shouldn’t belong to the TableEnvironment API. As the discussion in the pull-request [5][6], calcite has provided the `SqlNodeList parseSqlStmtList()` method to parse a list of SQL statements separated by a semicolon and constructs a parse tree. I think the SQL CLI can use this method to parse multiple statements and execute every single statement one by one through TableEnvironmet#executeSql(String sql). Here is one thing we should take care of is that there are some special commands like `help/set/quit` in SQL CLI to control the environment’s lifecycle and change the variables of the context. IMO, there are some ways to deal with these commands in the multiple statements:

  1. Support these special control commands in flink-sql-parser and the shortcoming will be that TableEnvironment should take care of those noisy commands and flink-sql-parser will lose it’s more widely expansibility to other external systems.
  2. SQL CLI parse those control commands on its own and should pre-split the multiple statements according to the control command. Then SQL CLI can pass the part of multiple statements to SqlParser and obtain a SqlNodeList. Personally, I would advise adopting this way.
  3. Flink already introduces a `Parser` interface which is exposed by `Planner`. We can add one more method to `Parser` like `List<String> splitSql(String)` and then we can borrow calcite to achieve this functionality.

Summary:

Methods of TableEnvironment

...

Current Call

...

Replacement

...

Comment

...

execute(String jobName)

...

deprecated

...

explain(boolean extended)

...

deprecated

...

sqlQuery(String sql)

...

fromQuery

...

sqlUpdate(String sql)

...

executeSql()

...

fromTableSource(TableSource tableSouce)

New suggested TableEnvironment methods:

...

added method Call

...

Comment

...

Table fromQuery(String sql)

...

Obtain a table from sql query stmt

...

ResultTable executeSql(String sql)

...

void executeBatch(String... sql)

...

sql limit to insert

Future Plan:

Notice: depends on FLIP-73/FLIP-74, not the scope of this flip.

To support execute time-cost batch sql or no-end streaming sql, it’s needed to provide asynchronous execute way. 

Provide async execute method for executeSql and executeBatch

Similarly as above, suggested methods:

/**
*  Asynchronously execute the given sql immediately and the sql can be 

* ddl/dml/`select` and the sql argument must be a single statement.
*/
CompletableFuture<ResultTable> asyncExecuteSql(String sql)

We also should support execute batch sql asynchronous. Suggested method:

CompletableFuture<JobClient> asyncExecBatchSql(String... sql)

Add an async execute method to org.apache.flink.table.delegation.Executor

...

/**
* Executes all the previously applied transformations via {@link #apply(List)} in an async way.
* @return
* @throws Exception
*/
CompletableFuture<JobExecutionResult> asyncExecute(String jobName) throws Exception;


  /**
  * Get the result contents as an iterable rows.

   */
  Iterable<Row> getResultRows();
}

The following table describes the result for each kind of statement:

Statement

Result Scheam

Result Value

Examples

DDL

field name: result

field type: VARCHAR(2)

"OK"

(single row)

CREATE TABLE new_table (col1 BIGINT, ...)

DML

(INSERT/UPDATE/DELETE)

field name: affected_rowcount

field type: BIGINT

the affected row count

(-1 means unknown)

INSERT INTO sink_table SELECT …  

SHOW xx

field name: result

field type: VARCHAR(n)

(n is the max length of values)

list all objects

(multiple rows)

SHOW CATALOGS

DESCRIBE xx

describe the detail of an object 

(single row)

DESCRIBE CATALOG catalog_name

EXPLAIN xx

explain the plan of a query

(single row)

EXPLAIN PLAN FOR SELECT …

USE xx

field name: result

field type: VARCHAR(2)

"OK"

(single row)

USE CATALOG catalog_name


`insertInto(String, Table)` 

Like the `INSERT` statement, the Tables passed to this method will also be buffered and will cause the buffer problem. So we advise deprecating this method.

 `Table.insertInto` will use this deprecated method now. Once this method is removed in the future, we will change the behavior of `Table.insertInto` method from lazy execution (triggered by `TableEnvironment.execute` method) to immediate execution (like `executeStatement` method). 


`execute(String jobName)` & `explain(boolean)`

Since we will disable buffering SQLs/Tables and plans, it’s meaningless to provide `execute(String jobName)` as the trigger entry point and explain(boolean) method should also not be used anymore. So we advise deprecating those two methods. Instead, we introduce a new method named `createDmlBatch` and a new class named `DmlBatch` to support multiple SQLs/Tables optimization.

From the class name of  `DmlBatch`, we know that only DML statements can be added to `DmlBatch`. In `DmlBatch`, only INSERT is supported now, DELETE and UPDATE can also be supported in the future.

 `DmlBatch` supports adding a list of SQLs and Tables through the `addXX` methods, getting the plan of all statements through the `explain` method, optimizing the whole statements and submitting the job through the `execute` method.  The added statements and Tables will be cleared when calling the `execute` method.

interface TableEnvironment {

 /** 

  * Create a DmlBatch instance which can add dml statements or Tables to 

  * the batch, the planner can optimize all added statements and Tables 

  * together for better performance.

  */

    DmlBatch createDmlBatch();

}

interface DmlBatch {

  /** 

   * add insert statement to the batch.

   */

    void addInsert(String insert);

  /** 

   * add Table with the given sink table name to the batch.

   */

    void addInsert(String targetPath, Table table);

   

  /** 

   * execute all statements and Tables as a batch.

   * 

   * The added statements and Tables will be cleared when executing

   * this method. 

   */

  ResultTable execute() throws Exception;

   

   /** 

   * returns the AST and the execution plan to compute the result of the 

   * all statements and Tables.

   * 

   * @param extended if the plan should contain additional properties.

   * e.g. estimated cost, traits

   */

  String explain(boolean extended);

}

Each statement or Table has a return value which is the affected row count of a statement or a Table. So the ResultTable has multiple columns. All column types are BIGINT, and the column name is "affected_rowcount_" plus the index of the statement or Table. e.g. 

DmlBatch batch = tEnv.createDmlBatch();

batch.addInsert("insert into xx ...");

batch.addInsert("yy", tEnv.sqlQuery("select ..."));

batch.execute("test")

The schema and data in ResultTable: 


column1 (insert into xx ... )

column2 (batch.addInsert("yy", tEnv.sqlQuery("select ...")))


Schema

name: affected_rowcount_0

type: BIGINT

name: affected_rowcount_1

type: BIGINT

Data (single row)

-1

-1


`Table fromTableSource(TableSource<?> source)`

Since Flip-64 has provided `ConnectTableDescriptor#createTemporaryTable` to register TableSource in TableEnvironment. This method should be deprecated too, it’s an omission in that flip.

How to correct the execution behavior?

First, let’s discuss the buffer problem in depth. Actually there are two levels of buffer, TableEnvironment will buffer SQLs/Tables and StreamExecutionEnvironment will buffer transformations to generate StreamGraph. Each TableEnvironment instance holds a StreamExecutionEnvironment instance. Currently, when translating a FlinkRelNode into a Flink operator, the generated transformations will be added to StreamExecutionEnvironment’s buffer. The bug[2] is caused by this behavior. Let’s give another simple example to explain the problem of StreamExecutionEnvironment’s buffer.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// will add transformations to env when translating to execution plan

tEnv.sqlUpdate("INSERT INTO sink1 SELECT a, b FROM MyTable1")

Table table = tEnv.sqlQuery("SELECT c, d from MyTable2")

DataStream dataStream = tEnv.toAppendStream(table, Row.class)

dataStream…

env.execute("job name") ;

// or tEnv.execute("job name") 

The job submitted by each execute method contains the topology of both queries. Users are confused about the behavior. As suggested in "Public Interfaces",`StreamExecutionEnvironment.execute` only triggers DataStream program execution, and `TableEnvironment.execute` only triggers table program execution. So the expected behavior for the above example is `env.execute("job name")` submits the second query, and `tEnv.execute("job name") ` submits the first query. 

To meet the requirement, we will change the current behavior of TableEnvironment: TableEnvironment instance buffers the SQLs/Tables and does not add generated transformations to the StreamExecutionEnvironment instance when translating to execution plan. (The solution is similar to DummyStreamExecutionEnvironment. We can use StreamGraphGenerator to generate StreamGraph based on the transformations. This requires the StreamTableSink always returns DataStream, and the StreamTableSink#emitDataStream method should be removed since it’s deprecated in Flink 1.9) StreamExecutionEnvironment instance only buffers the transformation translated from DataStream.

Now, we introduce `DmlBatch` to require users to explicitly buffer SQLs/Tables to support multiple sinks optimization. Although the `insertInto`, `sqlUpdate` and `execute` methods are deprecated, they will not be immediately deleted, so the deprecated methods and new methods must work together in one or more versions. The TableEnvironment’s buffer will be removed once the deprecated methods are deleted. 

After we correct the behavior of the `execute` method, users can easily and correctly write the table program even if the deprecated methods, the new methods and the `to DataStream` methods are mixed used.

Examples:

We will list some examples using old API and proposed API to have a straightforward comparison in this section.

`sqlUpdate` vs `executeStatement`:

Current Interface

New Interface

tEnv.sqlUpdate("CREATE TABLE test (...) with (path = '/tmp1')");

ResultTable result = tEnv.executeStatement("CREATE TABLE test (...) with (path = '/tmp1')");

result...

tEnv.sqlUpdate("INSERT INTO test SELECT ...");

tEnv.execute("test");

ResultTable result = tEnv.executeStatement("INSERT INTO test SELECT ...");

result...


`execute & explain` & vs `createDmlBatch`:

Current Interface

New Interface

tEnv.sqlUpdate("insert into xx ...")

tEnv.sqlUpdate("insert into yy ...")

tEnv.execute("test")

// tEnv.explain(false)

DmlBatch batch = tEnv.createDmlBatch();

batch.addInsert("insert into xx ...");

batch.addInsert("insert into yy ...");

ResultTable result = batch.execute();

// batch.explain(false)

Table table1 = tEnv.sqlQuery("select xx ...")...

Table table2 = tEnv.sqlQuery("select yy ...")...

tEnv.insertInto("sink1", table1)

tEnv.insertInto("sink2", table2)

tEnv.execute("test")

// tEnv.explain(false)

Table table1 = tEnv.sqlQuery("select xx ...")...

Table table2 = tEnv.sqlQuery("select yy ...")...

DmlBatch batch = tEnv.createDmlBatch();

batch.addInsert("sink1", table1);

batch.addInsert("sink2", table2);

ResultTable result = batch.execute()

// batch.explain(false)


Deprecated methods and new methods work together

TableEnvironment tEnv = ...

DmlBatch batch = tEnv.createDmlBatch();

tEnv.sqlUpdate("insert into s1 ..."); // statement1

batch.addInsert("insert into s2 ..."); // statement2

tEnv.insertInto("sink1",  tEnv.sqlQuery("select xx...")); // statement3

tEnv.executeStatement("insert into s3 ..."); // only submit the plan of this statement 

tEnv.explain(false); // explain the plan of statement1 and statement3

tEnv.execute( "test1"); // submit the plan of statement1 and statement3

batch.addInsert("sink2", tEnv.sqlQuery("select yy...")); // statement4

batch.explain(false); // explain the plan of statement2 and statement4

ResultTable result = batch.execute(); // submit the plan of statement2 and statement4

TableEnvironment’s methods and DataStream work together

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.sqlUpdate("insert into s1 ..."); // statement1

DmlBatch batch = tEnv.createDmlBatch();

batch.addInsert("sink1", tEnv.sqlQuery("select xx...")); // statement2

Table table =  tEnv.sqlQuery("select yy..."); 

DataStream dataStream = tEnv.toAppendStream(table, Row.class); // statement3

dataStream…

tEnv.explain(false); // explain the plan of statemen1

batch.explain(false); // explain the plan of statemen2

env.execute("test1") ;  // submit the plan of statement3

tEnv.execute("test2") ;  // submit the plan of statement1

batch.execute(); // submit the plan of statement2 


Summary:

Methods of TableEnvironment

Methods

Comments

void execute(String jobName)

deprecated

String explain(boolean extended)

deprecated

void sqlUpdate(String sql)

deprecated

void insertInto(String, Table)

deprecated

fromTableSource(TableSource tableSouce)

deprecated

ResultTable executeStatement(String statement)

added

DmlBatch createDmlBatch()

added


New methods for single statement & multiple statements


single statement

multiple statements

DDL

executeStatement()

Unsupported (supports multiple DDLs for easy testing in the future)

SHOW

DESCRIBE

USE

executeStatement()

Unsupported

DML

executeStatement() 

createDmlBatch() -> DmlBatch -> execute()

EXPLAIN

explain(Table) 

(can’t explain insert statement)

createDmlBatch() -> DmlBatch -> explain()


Compatibility, Deprecation, and Migration Plan

  • Methods of TableEnvironment to be deprecated:
    • void sqlUpdate(String sql)
    • void insertInto(String targetPath, Table table)
    • JobExecutionResult execute(String jobName)
    • String explain(boolean extended)
    • Table fromTableSource(TableSource tableSource)


Test Plan

The `DmlBatch#explain` method can be tested with unit tests, and other new methods can be tested with integration tests. We will also add some integration tests to verify the new methods can work with the deprecated methods correctly.

Rejected Alternatives

TableEnvironment#executeBatch(String... statement)

This method is consistent with the style of other methods in TableEnvironment, however It does not support Table API and can not explain the plan.

References

[1] FLIP-69 Flink SQL DDL Enhancement

[2] discuss planner buffer execute 

[3] FLIP-64: Support for Temporary Objects in Table module

[4] JDBC statement addBatch interface

[5] multiple statements in SQL CLI

[6] multiple statements in TableEnvironment

[7] flip-73 Introducing Executors for job submission

[8] flip-74 Flink JobClient API

[9] Sqline deal with batch execute

Appendix - Future Plan:

Notice: depends on FLIP-73/FLIP-74, not the scope of this flip.

To support execute time-cost batch sql or no-end streaming sql, it’s needed to provide asynchronous execute way. 

Provide async execute method for executeSql and executeBatch

Similarly as above, suggested methods:

/**

* Asynchronously execute the given statement immediately and the *statement can be ddl/dml/`select`. A statement separated by semicolon *is not supported.
*/
CompletableFuture<ResultTable> executeStatementAsync(String statement);


We also should support execute batch sql asynchronous. Suggested method:

interface DmlBatch {

 /** 

  * Asynchronously execute the dml statements as a batch

  */

 CompletableFuture<ResultTable> executeAsync();

}

Add an async execute method to org.apache.flink.table.delegation.Executor

/**
* Executes all the previously applied transformations via {@link #apply(List)} in an async way.
* @return
* @throws Exception
*/
CompletableFuture<JobExecutionResult> executeAsync(String jobName) throws Exception;


Since we already have flips[7][8] to provide asynchronous management, it’s convenient and natural to provide such a method.

Add an async execute method to org.apache.flink.streaming.api.environment#StreamExecutionEnvironment


public abstract CompletableFuture<JobExecutionResult> asyncExecute(StreamGraph streamGraph) throws Exception;


SQL CLI integrates with new API

  1. How SQL CLI leverage the DmlBatch class to obtain optimization?

We can reference other system design like Sqlline Batch Command[9] and introduce similarly command but we should notice that the sql in batch can only be `insert into`.

  1. How SQL CLI parse and execute multiple statements?

We don’t want to support multiple statements parsing in the TableEnvironment but this feature is needed in the SQL CLI for it’s natural to execute an external script. I have thought provided a parse method like `List<String> parse(String stmt)`, but it’s not intuitive to understand and this method shouldn’t belong to the TableEnvironment API. As the discussion in the pull-request [5][6], calcite has provided the `SqlNodeList parseSqlStmtList()` method to parse a list of SQL statements separated by a semicolon and constructs a parse tree. I think the SQL CLI can use this method to parse multiple statements and execute every single statement one by one through TableEnvironmet#executeStatement(String statement). Here is one thing we should take care of is that there are some special commands like `help/set/quit` in SQL CLI to control the environment’s lifecycle and change the variables of the context. IMO, there are some ways to deal with these commands in the multiple statements:

  1. Support these special control commands in flink-sql-parser and the shortcoming will be that TableEnvironment should take care of those noisy commands and flink-sql-parser will lose it’s more widely expansibility to other external systems. For example, SQL CLI may need to support `source xx` that execute an external script, it’s not proper to make TableEnvironment parser to see such syntax.

pro’s: 

  1. unified parser
  2. can handle corner case, e.g. https://github.com/apache/flink/pull/8738

con’s: 

    1. many commands are only used for sql-client, e.g. help, quit, source


  • how to meet the requirements of non-builtin commands, e.g. commands from flink-sql-gateway


  1. not easy to extend, it’s more difficult to implement a client-specific command in sql-parser than in specific client


  1. SQL CLI parses those control commands on its own and should pre-split the multiple statements according to the control command. Then SQL CLI can pass the part of multiple statements to SqlParser and obtain a SqlNodeList. 

pro’s: 

  1. sql-parser is more clean
  2. more easy to extend for sql-client

con’s: 

  1. many parsers: SqlCommandParser(in sql client),sql-parser
  2. may meet the corner case, e.g.  https://github.com/apache/flink/pull/8738


  1. Flink already introduces a `Parser` interface which is exposed by `Planner`. We can add one more method to `Parser` like: List<String> splitStatement(String) and then we can borrow calcite to achieve this functionality.

Special client commands (e.g. help, quit, source) are not supported in sql-parser now. Because the SqlParser#parseStmtList return SqlNodeList, not a string list, those special commands are not defined in SqlNode. So I think this approach is only a complement to the first one.

  1. Support a utility class to parse a statement separated by semicolon into multiple statements. 

pro’s: 

  1. more easy to extend for sql-client
  2. can handle corner case in a unified place

con’s: 

  1. many parsers: sql-parser,  a utility parser


I think option 4 is better. Looking forward to more people's opinions.

 (we should open an another flip to discuss this)

  1. Other open question?

Since we already have flips[7][8] to provide asynchronous management, it’s convenient and natural to provide such a method.

Add an async execute method to org.apache.flink.streaming.api.environment#StreamExecutionEnvironment

...

public abstract CompletableFuture<JobExecutionResult> asyncExecute(StreamGraph streamGraph) throws Exception;

SQL CLI integrates with new API

  1. How SQL CLI leverage the executeBatch method to obtain optimization?

We can reference other system design like Sqlline Batch Command[9] and introduce similarly command but we should notice that the sql in batch can only be `insert into`

  1. How SQL CLI parse and execute multiple statements?

See this Discuss a parse method for multiple statements

  1. Other open question?

...

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • Methods of TableEnvironment to be deprecated:
    • Table sqlQuery(String sql)
    • void sqlUpdate(String sql)
    • JobExecutionResult execute(String jobName)
    • String explain(boolean extended)
    • Table fromTableSource(TableSource tableSource)

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

References

[1] FLIP-69 Flink SQL DDL Enhancement

[2] discuss planner buffer execute 

[3] FLIP-64: Support for Temporary Objects in Table module

[4] JDBC statement addBatch interface

[5] multiple statements in SQL CLI

[6] multiple statements in TableEnvironment

[7] flip-73 Introducing Executors for job submission

[8] flip-74 Flink JobClient API

...