You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state[Under Discussion"]

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

TableEnvironment has provided two `Table sqlQuery(String sql)` and `void sqlUpdate(String sql)` interfaces to create a table(actually a view here) or describe an update action from one sql string. But with more use cases come up, there are some fatal shortcomings in current API design.

  1. `Table sqlQuery(String sql)` actually returns a temporal view from one select sql and must be registered again before using it in the following sql. According to FLIP-64[3], it’s natural to deprecate `Table sqlQuery(String sql)` and provided a new method `void createTemporalView(String path, String sql)` in TableEnvironment.
  2. Inconsistent execute semantics for `sqlUpdate()`. For now, one ddl passed to this method will be executed immediately while one `insert into` sql actually gets executed when we call `execute()` method, which confuses users a lot.
  3. Don’t support obtain returned value from sql execute. The FLIP-69[1] introduces a lot of common DDLs such as `show tables`, which require TableEnvironment can have an interface to obtain the executed result of one ddl. SQL CLI also has a strong demand for this feature so that we can easily unify the execute way of SQL CLI and TableEnvironemt. Besides, the method name `sqlUpdate` is not consistent with doing things like ‘show tables’.
  4. Unclear and buggy support for buffer sql execution[2]. Blink planner has provided the ability to optimize multiple sql, but we don’t have a clear mechanism through TableEnvironment API to control the whole flow.
  5. Don’t have the ability to deal with multiple statements. It’s a very usual scene that SQL CLI user to execute an outside sql script file. If TableEnvironemnt doesn’t want to support multiple statements, it or sql parser should expose the ability to parse multiple statements to outside, so that SQL CLI can leverage it.
  6. Don’t support execute sql in an asynchronous way. In streaming mode, one `insert into xxx` sql may never end. It’s also possible that one ETL task takes too much time to be done in a batch environment. So it’s very natural and necessary to support execute sql in an asynchronous way. 

Buffering sqls execution problem

  • TableEnvironment shouldn’t buffer sqls/execution plans. For example, now the TableEnvironment support writes code like the following:

tEnv.sqlUpdate("CREATE TABLE test (...) with (path = '/tmp1')");
tEnv.sqlUpdate("INSERT INTO test SELECT ...");
tEnv.sqlUpdate("DROP TABLE test");
tEnv.sqlUpdate("CREATE TABLE test (...) with (path = '/tmp2')");
tEnv.execute()


    1. Users are confused by what kinds of sql are executed at once and what is buffered and what kinds of sql are buffered untie triggered by execute method.
    2. Buffering sqls will cause behavior undefined. We may want to insert into test table with the `/tmp1` path but get the wrong result of `/tmp2`
  • Now the catalog is the only main state management entry point of TableEnvironment and buffering sqls and plans violate this design.

Public Interfaces

`execute(String jobName)` & `explain(boolean)`

According to Buffering sqls execution problem discussed above, we shouldn’t support buffering sqls and plans. So it’s meaningless to provide execute(String jobName) as the trigger entry point and we also should not use explain(boolean) method anymore. So we advise deprecating those two methods.

`insertInto(String, Table)` 

Same reason as ditto. Here we advise when calling this method should trigger a job execute at once.

`Table sqlQuery(String sql)`

For now, it’s meaningless to write code just like `tEnv.sqlQuery(“select * from src“)` but don’t deal with the return value. And we can’t directly use the return table in the following sql only if you have registered it in some way. In fact, the returned Table of `sqlQuery(String sql)` is a temporary virtual view and we can treat it as the flip-64 does.

Also, sqlQuery provided a bridge from sql query to Table, but the name `sqlQuery` seems confusing on the behavior: do I really read something from the sql? To eliminate the ambiguity we suggest to provide a fromQuery method similar to `Table from(String path);` but obtain one table from one sql query stmt. Suggested methods:

/**

 * Get a Table from one sql query stmt, the sql must be a `select` stmt    

 * and one single statement.

 */

Table fromQuery(String sql)

`void sqlUpdate(String sql)`

Now `void sqlUpdate(String sql)` will execute DDLs right now while DMLs(such as `insert into xx`) must be triggered by `TableEnvironment.execute()`. Both behaviors should be kept consistent. So we propose a new blocking execute method with returned value to fetch execute the result and this method should be deprecated.

Suggested methods:

/**
* Synchronously execute the given sql immediately and the sql can be

* ddl/dml/`select` but the sql argument must be a single statement.

* @return null for `ddl` and `insert into` sql stmt

*         ResultTable for `show xx` `describe xx` `select xx`
*/
ResultTable executeSql(String sql) throws Exception


Suggested ResultTable Interface:

/**
* A ResultTable is the representation of one sql statement execute the

* result.
*
*/
public interface ResultTable {
  /**
  * Get the schmea of one ResultTable.
  * @return
  */
  TableSchema getSchema();

  /**
  * Get the resultTable contents as a iterable rows.
  * @return
  */
  Iterable<Row> getContents();

  /**
  * Prints the schema of this table to the console in a tree format.
  */
  void printSchema();

  /**
  * Prints the content of this table to the console
  */
  void show();
}


`Table fromTableSource(TableSource<?> source)`

Since Flip-64 has provided `ConnectTableDescriptor#createTemporaryTable` to register TableSource in TableEnvironment. This method should be deprecated too, maybe it’s an omission in that flip.

Support batch sql execute

Since blink planner support multiple SQLs optimize(such as multiple sinks optimize), we need to introduce the new method called `executeBatch` which take a list of SQLs as the input arguments and optimize all together and submit the job to execute. 

Notice, input SQLs can not be ddl or select sql similar to the JDBC statement[4]. We have such requirements for passed ddl will cause behavior unexpectedly. Suggested method:

/**
* Execute sqls in a batch and planner can optimize the whole batch to
* obtain better performance.
* Notice: sql statements are limited to `insert into`
*/
ResultTable executeBatch(String... sql) throws Exception


Discuss a parse method for multiple statements execute in SQL CLI

We don’t want to support multiple statements execute in the TableEnvironment but this feature is needed in the SQL CLI for it’s natural to execute an external script. I have thought provided a parse method like `SqlNodeList parse(String stmt)`, but it’s not intuitive to understand and this method shouldn’t belong to the TableEnvironment API. As the discussion in the pull-request [5][6], calcite has provided the `SqlNodeList parseSqlStmtList()` method to parse a list of SQL statements separated by a semicolon and constructs a parse tree. I think the SQL CLI can use this method to parse multiple statements and execute every single statement one by one through TableEnvironmet#executeSql(String sql). Here is one thing we should take care of is that there are some special commands like `help/set/quit` in SQL CLI to control the environment’s lifecycle and change the variables of the context. IMO, there are some ways to deal with these commands in the multiple statements:

  1. Support these special control commands in flink-sql-parser and the shortcoming will be that TableEnvironment should take care of those noisy commands and flink-sql-parser will lose it’s more widely expansibility to other external systems.
  2. SQL CLI parse those control commands on its own and should pre-split the multiple statements according to the control command. Then SQL CLI can pass the part of multiple statements to SqlParser and obtain a SqlNodeList. Personally, I would advise adopting this way.
  3. Flink already introduces a `Parser` interface which is exposed by `Planner`. We can add one more method to `Parser` like `List<String> splitSql(String)` and then we can borrow calcite to achieve this functionality.

Summary:

Methods of TableEnvironment


Current Call

Replacement

Comment

execute(String jobName)

deprecated


explain(boolean extended)

deprecated


sqlQuery(String sql)

fromQuery


sqlUpdate(String sql)

executeSql()


fromTableSource(TableSource tableSouce)




New suggested TableEnvironment methods:


added method Call

Comment

Table fromQuery(String sql)

Obtain a table from sql query stmt

ResultTable executeSql(String sql)


void executeBatch(String... sql)

sql limit to insert


Future Plan:

Notice: depends on FLIP-73/FLIP-74, not the scope of this flip.

To support execute time-cost batch sql or no-end streaming sql, it’s needed to provide asynchronous execute way. 

Provide async execute method for executeSql and executeBatch

Similarly as above, suggested methods:

/**
*  Asynchronously execute the given sql immediately and the sql can be 

* ddl/dml/`select` and the sql argument must be a single statement.
*/
CompletableFuture<ResultTable> asyncExecuteSql(String sql)


We also should support execute batch sql asynchronous. Suggested method:

CompletableFuture<JobClient> asyncExecBatchSql(String... sql)


Add an async execute method to org.apache.flink.table.delegation.Executor

/**
* Executes all the previously applied transformations via {@link #apply(List)} in an async way.
* @return
* @throws Exception
*/
CompletableFuture<JobExecutionResult> asyncExecute(String jobName) throws Exception;


Since we already have flips[7][8] to provide asynchronous management, it’s convenient and natural to provide such a method.

Add an async execute method to org.apache.flink.streaming.api.environment#StreamExecutionEnvironment


public abstract CompletableFuture<JobExecutionResult> asyncExecute(StreamGraph streamGraph) throws Exception;


SQL CLI integrates with new API

  1. How SQL CLI leverage the executeBatch method to obtain optimization?

We can reference other system design like Sqlline Batch Command[9] and introduce similarly command but we should notice that the sql in batch can only be `insert into`

  1. How SQL CLI parse and execute multiple statements?

See this Discuss a parse method for multiple statements

  1. Other open question?


Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • Methods of TableEnvironment to be deprecated:
    • Table sqlQuery(String sql)
    • void sqlUpdate(String sql)
    • JobExecutionResult execute(String jobName)
    • String explain(boolean extended)
    • Table fromTableSource(TableSource tableSource)

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels