Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

To ensure that Flink SQL is semantically consistent in Streaming mode and Batch mode, combining the current situation of Flink and the needs of our business, choosing LEVEL-2 atomicity as the default behavior for Flink streaming and batch mode. If the user requires LEVEL-3 atomicity,  this ability can be achieved by enabling an table.cor-atomicity.enabled option. In general, batch mode usually requires LEVEL-3 atomicity. In a nutshell, Flink provide provides two level atomicity guarantee, LEVEL-2 as the default behavior.

...

Resulting table equivalent to:

Code Block
languagesql
themeEmacs
titlesyntax
CREATE TABLE ctas_hudi
 (
 	id BIGINT,
 	name STRING,
 	age INT
 )
 WITH ('connector.type' = 'hudi');

INSERT INTO ctas_hudi SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;

...

/**
* The CREATE TABLE AS SELECT(CTAS) syntax requires CREATE/DROP TABLE operations via Catalog on the JM side,
* so this interface should extends Serializable interface, then it can be serialized as a part of {@link JobGraph}.
*/

@PublicEvolving
public interface Catalog extends Serializable {

    /**
* This method is used to infer the default options for {@link CatalogBaseTable} through {@link Catalog} options to compile
* the sql successfully by planner when using the {@code Create Table As Select} syntax.
*
* Assuming an user want to select data from a kafka table and then insert the result to mysql table, if the mysql table is not existed in
* in physical mysql storage, user also doesn't want to create the table manually in mysql side because of complex type mapping.
* User can create the {@link JdbcCatalog} firstly which connect to the msyql instance, then use
* {@code CREATE TABLE `mysql`.`user_db`.`order_cnt` AS SELECT * FROM `KafkaTable`} syntax, it is convenient to load data from kafka
* to msyql. Due to the {@link JdbcCatalog} has provides user, password, url and other options, so user doesn't need to fill the
* option in the query. If user doesn't specify the target table required options, planner will call this method to fill the options to
* {@link CatalogBaseTable} which are need to compile sql by planner.

*
* <p>{@link JdbcCatalog} example:
* <pre>{@code
* // If the user does not set any Table's options,
* // then the implementation of JdbcCatalog#inferTableOptions
* // can be like this to avoid the execution failure.
* public CatalogBaseTable inferTableOptions(ObjectPath tablePath, CatalogTable table) {
* Map<String, String> tableOptions = table.getOptions();
* tableOptions.put("connector", "jdbc");
* tableOptions.put("url", getBaseUrl());
* tableOptions.put("table-name", tablePath.getObjectName());
* tableOptions.put("username", getUsername());
* tableOptions.put("password", getPassword());
* return table.copy(tableOptions);
* }
* }</pre>
*/
default CatalogBaseTable inferTableOptions(ObjectPath tablePath, CatalogTable table) {
throw new UnsupportedOperationException();
}

}

TableConfigOptions

Add configuration options table.cor-atomicity.enabled option to allow users to enable atomicity when using create table as select syntax.

...

languagejava

...

@PublicEvolving
public class TableConfigOptions {

...


 

...

 @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)

...


public static final ConfigOption<Boolean> TABLE_COR_ATOMICITY_ENABLED =

...


key("table.cor-atomicity.enabled")

...


.booleanType()

...


.defaultValue(false)

...


.withDescription(

...


"Specifies if the create table as select operation is executed atomically. "

...


+ "By default, the operation is non-atomic. The target table is created in Client side, and it will not be dropped even though the job fails or is cancelled. "

...


+ "If set this option to true, the target table is created in JM side, it also will be dropped when the job fails or is cancelled.");

...


}

Implementation Plan

We provide two semantics for Flink CTAS: Non-atomic and Atomic. Non-atomic implementations are the default behavior of Streaming and Batch modes. 

...