Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

@PublicEvolving
public interface TableEnvironment {
    /**
* Registers the given {@link Table}'s result as a catalog table with {@link TableDescriptor}'s options.
*
* <p> CTAS for Table API.
*
* <p>Examples:
*
* <pre>{@code
* Map<String, String> options = new HashMap<String, String>();
* options.put("connector.type", "hudi");
* tEnv.createTable("MyTable", options, tEnv.sqlQuery("select id, name from user_table"));
* }</pre>
*
* @param path The path under which the table will be registered. See also the {@link
* TableEnvironment} class description for the format of the path.
* @param options Table options.
* @param query The {@link Table} object describing the pipeline for further transformations.
*/
void createTable(String path, Map<String, String> options, Table query);
}

Program research

I investigated other bigdata engine implementations such as hive, spark:

Hive(MR) :atomic

Hive MR is client mode, the client is responsible for parsing, compiling, optimizing, executing, and finally cleaning up.

...

  1. Execute query first, and write the query result to the temporary directory.
  2. If all MR tasks are executed successfully, then create a table and load the data.
  3. If the execution fails, the table will not be created.

Spark(DataSource v1) : non-atomic

There is a role called driver in Spark, the driver is responsible for compiling tasks, applying for resources, scheduling task execution, tracking task operation, etc.

...

  1. Create a sink table based on the schema of the query result.
  2. Execute the spark task and write the result to a temporary directory.
  3. If all Spark tasks are executed successfully, use the Hive API to load data into the sink table created in the first step.
  4. If the execution fails, driver will drop the sink table created in the first step.

Spark(DataSource v2, Not yet completed, Hive Catalog is not supported yet) : optional atomic

Non-atomic

 Non-atomic implementation is consistent with DataSource v1 logic. For details, see CreateTableAsSelectExec .

Atomic

Atomic implementation( for details, see AtomicCreateTableAsSelectExec), supported by StagingTableCatalog and StagedTable .

...

  1. Create a StagedTable based on the schema of the query result, but it is not visible in the catalog.
  2. Execute the spark task and write the result into StagedTable.
  3. If all Spark tasks are executed successfully, call StagedTable#commitStagedChanges(), then it is visible in the catalog.
  4. If the execution fails, call StagedTable#abortStagedChanges().

Research summary

We want to unify the semantics and implementation of Streaming and Batch, we finally decided to use the implementation of Spark DataSource v1.

...