Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. We propose to deprecate the following methods in TableEnvironment:

    • void sqlUpdate(String sql)

    • void insertInto(String targetPath, Table table)

    • void execute(String jobName)

    • String explain(boolean extended)

    • Table fromTableSource(TableSource<?> source)

  2. meanwhile, we propose to introduce the following new methods in TableEnvironment:
    • ResultTable executeStatement(String statement) 
      synchronously execute the given single statement immediately, and return the execution result.

      publicinterfaceResultTable {
          TableSchema getResultSchema();
          Iterable<Row> getResultRows();
      }

    • DmlBatch createDmlBatch()
      create a DmlBatch instance which can add dml statements or Tables to the batch and explain or execute them as a batch.

      interfaceDmlBatch{
          void addInsert(String insert);
          voidaddInsert(String targetPath, Table table);
          ResultTableexecute() throws Exception ;
          Stringexplain(boolean extended);
      }

  3. For current messy Flink table program trigger point, we propose that: for TableEnvironment and StreamTableEnvironment, you must use `TableEnvironment.execute()` to trigger table program execution, once you convert the table program to a DataStream program (through `toAppendStream` or `toRetractStream` method), you must use `StreamExecutionEnvironment.execute` to trigger the DataStream program.

...

/**
 * Synchronously execute the given single statement immediately and the statement can be DDL/DML/SHOW/DESCRIBE/EXPLAIN/USE. 
 * If the statement is translated to a Flink job, the result will be returned until the job is finished.
 *   
 * @return result for SHOW/DESCRIBE/EXPLAIN, the affected row count for `DML` (-1 means unknown), or a string message ("OK") for other  statements.
 * @throws Exception which occurs during the execution.
 */
ResultTable executeStatement(String statement)throws Exception;

This method only supports executing a single statement which can be DDL, DML, SHOW, DESCRIBE, EXPLAIN and USE statement. This method will be executed synchronously and return a ResultTable which is the representation of the execution result, and contains the result data and the result schema. If an error occurs, this method will throw an exception.


/** 
 * A ResultTable is the representation of the statement execution result.
 */
publicinterfaceResultTable {


  /** 
   * Get the schema of ResultTable. 
   */
    TableSchema getResultSchema();


  /**
    *Get the result contents as an iterable rows. 
    */
    Iterable<Row> getResultRows();
}

...

interface TableEnvironment {

 /** 
  
  * Create a DmlBatch instance which can add dml statements or Tables to the batch, the planner can optimize all added statements and Tables together for better performance.
    */
      DmlBatchcreateDmlBatch();

}


interface DmlBatch {

  /** 
    * add insert statement to the batch.
    */
    void    voidaddInsert(String insert);


  /** 
   * add Table with the given sink table name to the batch. 
   */
       voidaddInsert(String targetPath, Table table);

   

  /** 
   
   * execute all statements and Tables as a batch.
      
      * The added statements and Tables will be cleared when executing   * this when  this method. 
      */
  ResultTable    ResultTable execute() throws Exception;

   

   /** 
       * returns the AST and the execution plan to compute the result of the    * all the all statements and Tables.
      
       * @param extended if the plan should contain additional properties.   * e e.g. estimated cost, traits
       */
  String     String explain(boolean extended);

}

...

Similarly as above, suggested methods:

/** 
 * Asynchronously execute the given single statement and the statement can be DDL/DML/SHOW/DESCRIBE/EXPLAIN/USE. 
 */
CompletableFuture<ResultTable> executeStatementAsync(String statement);


We also should support executing batch sql asynchronously. Suggested method:

interface DmlBatch {

 /** 
  
  * Asynchronously execute the dml statements as a batch
    */
 CompletableFuture<ResultTable> executeAsync();

}


Add an async execute method to org.apache.flink.table.delegation.Executor

/**
 * Executes all the previously applied transformations via {@link #apply(List)} in an async way.
 * @return
 * @throws Exception
 */
CompletableFuture<JobExecutionResult> executeAsync(String jobName) throws Exception;

...

  1. How SQL CLI leverage the DmlBatch class to obtain optimization?

    We can reference other system design like Sqlline Batch Command[9] and introduce similarly command but we should notice that the sql in batch can only be `insert into`.

  2. How SQL CLI parse and execute multiple statements?

    We don’t want to support multiple statements parsing in the TableEnvironment but this feature is needed in the SQL CLI for it’s natural to execute an external script. I have thought provided a parse method like `List<String> parse(String stmt)`, but it’s not intuitive to understand and this method shouldn’t belong to the TableEnvironment API. As the discussion in the pull-request [5][6], calcite has provided the `SqlNodeList parseSqlStmtList()` method to parse a list of SQL statements separated by a semicolon and constructs a parse tree. I think the SQL CLI can use this method to parse multiple statements and execute every single statement one by one through TableEnvironmet#executeStatement(String statement). Here is one thing we should take care of is that there are some special commands like `help/set/quit` in SQL CLI to control the environment’s lifecycle and change the variables of the context. IMO, there are some ways to deal with these commands in the multiple statements:
    1. Support these special control commands in flink-sql-parser and the shortcoming will be that TableEnvironment should take care of those noisy commands and flink-sql-parser will lose it’s more widely expansibility to other external systems. For example, SQL CLI may need to support `source xx` that execute an external script, it’s not proper to make TableEnvironment parser to see such syntax.
      1.  pro’s: 
      2.  con’s: 
        • many commands are only used for sql-client, e.g. help, quit, source
        • how to meet the requirements of non-builtin commands, e.g. commands from flink-sql-gateway
        • not easy to extend, it’s more difficult to implement a client-specific command in sql-parser than in specific client 
    2. SQL CLI parses those control commands on its own and should pre-split the multiple statements according to the control command. Then SQL CLI can pass the part of multiple statements to SqlParser and obtain a SqlNodeList. 
      1. pro’s:
        • sql-parser is more clean
        • more easy to extend for sql-client
      2.  con’s: 
    3. Flink already introduces a `Parser` interface which is exposed by `Planner`. We can add one more method to `Parser` like: List<String> splitStatement(String) and then we can borrow calcite to achieve this functionality. Special client commands (e.g. help, quit, source) are not supported in sql-parser now. Because the SqlParser#parseStmtList return SqlNodeList, not a string list, those special commands are not defined in SqlNode. So I think this approach is only a complement to the first one.
    4. Support a utility class to parse a statement separated by semicolon into multiple statements.
      1. pro’s:
        • more easy to extend for sql-client
        • can handle corner case in a unified place
      2. con’s:
        • many parsers: sql-parser,  a utility parser

  3. Other open question?