Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • TableEnvironment shouldn’t buffer sqls/execution plans. For example, now the TableEnvironment support writes code like the following:

tEnv.sqlUpdate("CREATE TABLE test (...) with (path = '/tmp1')");
tEnv.sqlUpdate("INSERT INTO test SELECT ...");
tEnv.sqlUpdate("DROP TABLE test");
tEnv.sqlUpdate("CREATE TABLE test (...) with (path = '/tmp2')");
tEnv.execute()


    1. Users are confused by what kinds of sql are executed at once and what is buffered and what kinds of sql are buffered untie triggered by execute method.
    2. Buffering sqls will cause behavior undefined. We may want to insert into test table with the `/tmp1` path but get the wrong result of `/tmp2`
  • Now the catalog is the only main state management entry point of TableEnvironment and buffering sqls and plans violate this design.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

...

Binary log format

...

The network protocol and api behavior

...

Any class in the public packages under clientsConfiguration, especially client configuration

  • org/apache/kafka/common/serialization

  • org/apache/kafka/common

  • org/apache/kafka/common/errors

  • org/apache/kafka/clients/producer

  • org/apache/kafka/clients/consumer (eventually, once stable)

...

Monitoring

...

Command line tools and arguments

`execute(String jobName)` & `explain(boolean)`

According to Buffering sqls execution problem discussed above, we shouldn’t support buffering sqls and plans. So it’s meaningless to provide execute(String jobName) as the trigger entry point and we also should not use explain(boolean) method anymore. So we advise deprecating those two methods.

`insertInto(String, Table)` 

Same reason as ditto. Here we advise when calling this method should trigger a job execute at once.

`Table sqlQuery(String sql)`

For now, it’s meaningless to write code just like `tEnv.sqlQuery(“select * from src“)` but don’t deal with the return value. And we can’t directly use the return table in the following sql only if you have registered it in some way. In fact, the returned Table of `sqlQuery(String sql)` is a temporary virtual view and we can treat it as the flip-64 does.

Also, sqlQuery provided a bridge from sql query to Table, but the name `sqlQuery` seems confusing on the behavior: do I really read something from the sql? To eliminate the ambiguity we suggest to provide a fromQuery method similar to `Table from(String path);` but obtain one table from one sql query stmt. Suggested methods:

/**

 * Get a Table from one sql query stmt, the sql must be a `select` stmt    

 * and one single statement.

 */

Table fromQuery(String sql)

`void sqlUpdate(String sql)`

Now `void sqlUpdate(String sql)` will execute DDLs right now while DMLs(such as `insert into xx`) must be triggered by `TableEnvironment.execute()`. Both behaviors should be kept consistent. So we propose a new blocking execute method with returned value to fetch execute the result and this method should be deprecated.

Suggested methods:

/**
* Synchronously execute the given sql immediately and the sql can be

* ddl/dml/`select` but the sql argument must be a single statement.

* @return null for `ddl` and `insert into` sql stmt

*         ResultTable for `show xx` `describe xx` `select xx`
*/
ResultTable executeSql(String sql) throws Exception


Suggested ResultTable Interface:

/**
* A ResultTable is the representation of one sql statement execute the

* result.
*
*/
public interface ResultTable {
  /**
  * Get the schmea of one ResultTable.
  * @return
  */
  TableSchema getSchema();

  /**
  * Get the resultTable contents as a iterable rows.
  * @return
  */
  Iterable<Row> getContents();

  /**
  * Prints the schema of this table to the console in a tree format.
  */
  void printSchema();

  /**
  * Prints the content of this table to the console
  */
  void show();
}


`Table fromTableSource(TableSource<?> source)`

Since Flip-64 has provided `ConnectTableDescriptor#createTemporaryTable` to register TableSource in TableEnvironment. This method should be deprecated too, maybe it’s an omission in that flip.

Support batch sql execute

Since blink planner support multiple SQLs optimize(such as multiple sinks optimize), we need to introduce the new method called `executeBatch` which take a list of SQLs as the input arguments and optimize all together and submit the job to execute. 

Notice, input SQLs can not be ddl or select sql similar to the JDBC statement[4]. We have such requirements for passed ddl will cause behavior unexpectedly. Suggested method:

/**
* Execute sqls in a batch and planner can optimize the whole batch to
* obtain better performance.
* Notice: sql statements are limited to `insert into`
*/
ResultTable executeBatch(String... sql) throws Exception


Discuss a parse method for multiple statements execute in SQL CLI

We don’t want to support multiple statements execute in the TableEnvironment but this feature is needed in the SQL CLI for it’s natural to execute an external script. I have thought provided a parse method like `SqlNodeList parse(String stmt)`, but it’s not intuitive to understand and this method shouldn’t belong to the TableEnvironment API. As the discussion in the pull-request [5][6], calcite has provided the `SqlNodeList parseSqlStmtList()` method to parse a list of SQL statements separated by a semicolon and constructs a parse tree. I think the SQL CLI can use this method to parse multiple statements and execute every single statement one by one through TableEnvironmet#executeSql(String sql). Here is one thing we should take care of is that there are some special commands like `help/set/quit` in SQL CLI to control the environment’s lifecycle and change the variables of the context. IMO, there are some ways to deal with these commands in the multiple statements:

  1. Support these special control commands in flink-sql-parser and the shortcoming will be that TableEnvironment should take care of those noisy commands and flink-sql-parser will lose it’s more widely expansibility to other external systems.
  2. SQL CLI parse those control commands on its own and should pre-split the multiple statements according to the control command. Then SQL CLI can pass the part of multiple statements to SqlParser and obtain a SqlNodeList. Personally, I would advise adopting this way.
  3. Flink already introduces a `Parser` interface which is exposed by `Planner`. We can add one more method to `Parser` like `List<String> splitSql(String)` and then we can borrow calcite to achieve this functionality.

Summary:

Methods of TableEnvironment


Current Call

Replacement

Comment

execute(String jobName)

deprecated


explain(boolean extended)

deprecated


sqlQuery(String sql)

fromQuery


sqlUpdate(String sql)

executeSql()


fromTableSource(TableSource tableSouce)




New suggested TableEnvironment methods:


added method Call

Comment

Table fromQuery(String sql)

Obtain a table from sql query stmt

ResultTable executeSql(String sql)


void executeBatch(String... sql)

sql limit to insert


Future Plan:

Notice: depends on FLIP-73/FLIP-74, not the scope of this flip.

To support execute time-cost batch sql or no-end streaming sql, it’s needed to provide asynchronous execute way. 

Provide async execute method for executeSql and executeBatch

Similarly as above, suggested methods:

/**
*  Asynchronously execute the given sql immediately and the sql can be 

* ddl/dml/`select` and the sql argument must be a single statement.
*/
CompletableFuture<ResultTable> asyncExecuteSql(String sql)


We also should support execute batch sql asynchronous. Suggested method:

CompletableFuture<JobClient> asyncExecBatchSql(String... sql)


Add an async execute method to org.apache.flink.table.delegation.Executor

/**
* Executes all the previously applied transformations via {@link #apply(List)} in an async way.
* @return
* @throws Exception
*/
CompletableFuture<JobExecutionResult> asyncExecute(String jobName) throws Exception;


Since we already have flips[7][8] to provide asynchronous management, it’s convenient and natural to provide such a method.

Add an async execute method to org.apache.flink.streaming.api.environment#StreamExecutionEnvironment


public abstract CompletableFuture<JobExecutionResult> asyncExecute(StreamGraph streamGraph) throws Exception;


SQL CLI integrates with new API

  1. How SQL CLI leverage the executeBatch method to obtain optimization?

We can reference other system design like Sqlline Batch Command[9] and introduce similarly command but we should notice that the sql in batch can only be `insert into`

  1. How SQL CLI parse and execute multiple statements?

See this Discuss a parse method for multiple statements

  1. Other open question?

...


Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • Methods of TableEnvironment to be deprecated:
    • Table sqlQuery(String sql)
    • void sqlUpdate(String sql)
    • JobExecutionResult execute(String jobName)
    • String explain(boolean extended)
    • Table fromTableSource(TableSource tableSource)
  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

...