Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposing a public interface CreateOrReplaceTable used by CTAS(CREATE TABLE AS SELECT) for table API.

/** A table that need to be created for CTAS. */
@PublicEvolving
public interface CreateOrReplaceTable {
* A table to help the Table API implement the Create Table As Select(CTAS) feature.
* The Create or Replace Table and Replace Table feature may be supported in the future,
* so it is named CreateOrReplaceTable.
*/
@PublicEvolving
public interface CreateOrReplaceTable {

/**
* Adding table options to CreateOrReplaceTable.
*
* <p>Example:
*
* <pre>{@code
* tab.option("connector", "filesystem");
* }</pre>
*/
CreateOrReplaceTable option(String key, String value);

/**
* Create the table in the specified path and write the pipeline data to this table.
*
* <p>Example:
*
* <pre>{@code
* table.saveAs("my_ctas_table")
* .option("connector", "filesystem")
* .option("format", "testcsv")
* .option("path", "/tmp/my_ctas_table/")
* .create();
* }</pre>
*/
TablePipeline create();

/**
* Create addthe table option.
* such as option("connector", "filesystem"); under the specified path if not exist and write the pipeline data to this table.
*/
CreateOrRepalceTable option(String key, String value) * <p>Example:
; *
* <pre>{@code/**
* Create the table under the specified path
table.saveAs("my_ctas_table")
* * and write the pipeline data to this table.
.option("connector", "filesystem")
* */
TablePipeline; create().option("format", "testcsv")
/ **
* Create the table under the specified path if not exist
.option("path", "/tmp/my_ctas_table/")
* * and write the pipeline data to this table. .createIfNotExist();
* }</pre>
*/
TablePipeline createIfNotExist();
}

The CreateOrReplaceTable interface is introduced newly because if we add the create/createIfNotExist API to the Table interface, the user must call the saveAs API before calling these API, which will cause additional usage costs to the user.

The reason it is called CreateOrReplaceTable is to prepare for future support of the Replace Table feature.

The recommended way to use is:

...

@PublicEvolving
public interface Catalog {

   /**
* When the Create Table As Select(CTAS) functionfeature was used, the Catalog combined {@link Catalog#supportsManagedTable}
* to infer whether to add some options to the {@link CatalogBaseTable}.
*
* For example:
* {@link JdbcCatalog} can add connector = 'jdbc', user, password, url and other options to {@link CatalogBaseTable},
* {@link HiveCatalog} can add connector = 'hive' to {@link CatalogBaseTable}, help generate TableSink.
* The tables in {@link GenericInMemoryCatalog} already exist externally,
* options must be filled in manually by the user, and the Catalog cannot be automatically inferred.
*/
default CatalogBaseTable inferTableOptions(ObjectPath tablePath, CatalogBaseTable table) {
       throw new UnsupportedOperationException();
}
}

...

  1. Flink Client compiles SQL and generates an execution plan, In this process, the Hook that needs to be executed on the JM side is generated, and the Hook, Catalog and CatalogBaseTable are serialized.
  2. Submit the job to the cluster, if it is in detached mode, the client can exit.
  3. When the job starts, deserialize hooks, Catalog and CatalogBaseTable; Call the Catalog#createTable method through the hook to create the CatalogBaseTable.
  4. start task execution.
  5. If the final status of the job is failed or canceled, the created CatalogBaseTable needs to be dropped by calling the hook of the Catalog#dropTable method.

 In In streaming mode, usually the data is written in real time and visible in real time, so streaming mode does not provide atomicity guarantee, then there is no need to use JobStatusHook mechanism.use JobStatusHook mechanism.

Batch mode requires the use of the JobStatusHook mechanism to ensure atomicity.  We will be compatible in Planner between stream mode without JobStatusHook and batch mode with JobStatusHook to achieve the ultimate atomicity of batch modeBatch mode requires the use of the JobStatusHook mechanism to ensure atomicity.

Planner

Providing method for planner to register JobStatusHook with StreamGraph.

...

Note:  This solution only works if we create the Catalog using DDL, because we can only get the Catalog properties with the with keyword. If we use a Catalog registered by TableEnvironment#registerCatalog method, we cannot get these properties. Therefore, CTAS does not currently support jobs that use TableEnvironment#registerCatalog.

In the HiveCatalog solution, since the configuration of hive-conf-dir is a local path, make sure that all nodes in the cluster are placing hive configuration files under the same path. The current Application mode of Flink also has this problem.

Runtime

Provide JM side, job status change hook mechanism.

...