Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

/**
* The table is used for the {@code Create Table As Select} syntax in Table API level. This interface
* is created by {@code Table#save} method, and provides some methods such as {@code option} and {@code create}
* method help to fill the table options and produce {@link TablePipeline}. Currently we only support
* Create Table As Select(CTAS) featuresyntax, but t
he {@code Replace Table AS SELECT} and {@code Create Or CreateOrReplaceReplace Table AS SELECT}
* syntax may be supported in the future.

*/
@PublicEvolving
public interface CreateOrReplaceTable {

/**
* Adding table options to CreateOrReplaceTable.
*
* <p>Example:
*
* <pre>{@code
* tab.option("connector", "filesystem");
* }</pre>
*/
CreateOrReplaceTable option(String key, String value);

/**
* Create the table in the specified path and write the pipeline data to this table.
*
* <p>Example:
*
* <pre>{@code
* table.saveAs("my_ctas_table")
* .option("connector", "filesystem")
* .option("format", "testcsv")
* .option("path", "/tmp/my_ctas_table/")
* .create();
* }</pre>
*/
TablePipeline create();

/**
* Create the table under the specified path if not exist and write the pipeline data to this table.
*
* <p>Example:
*
* <pre>{@code
* table.saveAs("my_ctas_table")
* .option("connector", "filesystem")
* .option("format", "testcsv")
* .option("path", "/tmp/my_ctas_table/")
* .createIfNotExist();
* }</pre>
*/
TablePipeline createIfNotExist();
}

The CreateOrReplaceTable interface is introduced newly because if we add the create/createIfNotExist API to in the Table interface, the user must call the saveAs API before calling these API, which will cause additional usage costs to the user. The reason it is called CreateOrReplaceTable is to prepare for future support of the Replace Table featureThis API only support Create Table As Select syntax currently, but in the future, we maybe support Replace Table As Select and  Create Or Replace As Table syntax which is also supported by some other batch compute engine.

The recommended way to use isCreateOrReplaceTable as following:

TablePipeline tablePipeline = table.saveAs("my_ctas_table")
.option("connector", "filesystem")
.option("format", "testcsv")
.option("path", "/tmp/my_ctas_table/")
.create();
tablePipeline.execute();

...

Providing method that are used to infer the options of CatalogBaseTable, these options are will be used to compile the sql to JobGraph successfully.

@PublicEvolving
public interface Catalog {

    /**
* WhenThis usingmethod theis Createused Tableto As Select(CTAS) feature,
* planner will use this API to infer the default options for Table to compileinfer the default options for {@link CatalogBaseTable} through {@link Catalog} options to compile
* the sql successfully.
by planner when using *
the {@code Create Table As *Select} Forsyntax. example:
*
{@link JdbcCatalog} can add connector = 'jdbc', * Assuming an user want to select data from a kafka table and then insert the result to mysql table, if the mysql table is not existed in
* in physical mysql storage, user also doesn't want to create the table manually in mysql side because complex type mapping.
* User can create the {@link JdbcCatalog} firstly which connect to the msyql instance, then use
* {@code CREATE TABLE `mysql`.`user_db`.`order_cnt` AS SELECT * FROM `KafkaTable`} syntax, it is convenient to load the from kafka
* to msyql. Due to the {@link JdbcCatalog} has provides user, password, url and other options to {
@link CatalogBaseTable},
so user doesn't need *to {@linkfill HiveCatalog}the can
add connector = 'hive' to* {
@link CatalogBaseTable}, help generate TableSink.
* The tables in {@link GenericInMemoryCatalog} already exist externally,
* options must be filled in manually by the user, and the Catalog cannot be automatically inferredoption in the query. If user doesn't specify the target table required options, planner will call this method to fill the options to
* {@link CatalogBaseTable} which are need to compile sql by planner.

*
* <p>Example{@link JdbcCatalog} example:
* <pre>{@code
* // If the user does not set any Table's options,
* // then the implementation of JdbcCatalog#inferTableOptions
* // can be like this to avoid the execution failure.
* public void inferTableOptions(ObjectPath tablePath, CatalogBaseTable table) {
* Map<String, String> tableOptions = table.getOptions();
* tableOptions.put("connector", "jdbc");
* tableOptions.put("url", getBaseUrl());
* tableOptions.put("table-name", tablePath.getObjectName());
* tableOptions.put("username", getUsername());
* tableOptions.put("password", getPassword());
* }
* }</pre>
*/
default void inferTableOptions(ObjectPath tablePath, CatalogBaseTable table) {
throw new UnsupportedOperationException();
}

}

...

In streaming mode, usually the data is written in real time and visible in real time, so streaming mode does not provide atomicity guarantee, then there is no need to use JobStatusHook mechanism. Batch However, batch mode requires the to use of the JobStatusHook mechanism to ensure atomicity.  We will be compatible in Planner between stream mode without JobStatusHook and batch mode with JobStatusHook to achieve the ultimate atomicity of batch mode.

...