...
To ensure that Flink SQL is semantically consistent in Streaming mode and Batch mode, combining the current situation of Flink and the needs of our business, choosing LEVEL-2 atomicity as the default behavior for Flink streaming and batch mode. If the user requires LEVEL-3 atomicity, this ability can be achieved by enabling an table.cor-atomicity.enabled option. In general, batch mode usually requires LEVEL-3 atomicity. In a nutshell, Flink provide provides two level atomicity guarantee, LEVEL-2 as the default behavior.
...
Resulting table equivalent to:
Code Block |
---|
language | sql |
---|
theme | Emacs |
---|
title | syntax |
---|
|
CREATE TABLE ctas_hudi
(
id BIGINT,
name STRING,
age INT
)
WITH ('connector.type' = 'hudi');
INSERT INTO ctas_hudi SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0; |
...
/** * The CREATE TABLE AS SELECT(CTAS) syntax requires CREATE/DROP TABLE operations via Catalog on the JM side, * so this interface should extends Serializable interface, then it can be serialized as a part of {@link JobGraph}. */ @PublicEvolving public interface Catalog extends Serializable {
/** * This method is used to infer the default options for {@link CatalogBaseTable} through {@link Catalog} options to compile * the sql successfully by planner when using the {@code Create Table As Select} syntax. * * Assuming an user want to select data from a kafka table and then insert the result to mysql table, if the mysql table is not existed in * in physical mysql storage, user also doesn't want to create the table manually in mysql side because of complex type mapping. * User can create the {@link JdbcCatalog} firstly which connect to the msyql instance, then use * {@code CREATE TABLE `mysql`.`user_db`.`order_cnt` AS SELECT * FROM `KafkaTable`} syntax, it is convenient to load data from kafka * to msyql. Due to the {@link JdbcCatalog} has provides user, password, url and other options, so user doesn't need to fill the * option in the query. If user doesn't specify the target table required options, planner will call this method to fill the options to * {@link CatalogBaseTable} which are need to compile sql by planner. * * <p>{@link JdbcCatalog} example: * <pre>{@code * // If the user does not set any Table's options, * // then the implementation of JdbcCatalog#inferTableOptions * // can be like this to avoid the execution failure. * public CatalogBaseTable inferTableOptions(ObjectPath tablePath, CatalogTable table) { * Map<String, String> tableOptions = table.getOptions(); * tableOptions.put("connector", "jdbc"); * tableOptions.put("url", getBaseUrl()); * tableOptions.put("table-name", tablePath.getObjectName()); * tableOptions.put("username", getUsername()); * tableOptions.put("password", getPassword()); * return table.copy(tableOptions); * } * }</pre> */ default CatalogBaseTable inferTableOptions(ObjectPath tablePath, CatalogTable table) { throw new UnsupportedOperationException(); } }
|
TableConfigOptions
Add configuration options table.cor-atomicity.enabled option to allow users to enable atomicity when using create table as select syntax.
...
...
@PublicEvolving public class TableConfigOptions { |
...
...
@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) |
...
public static final ConfigOption<Boolean> TABLE_COR_ATOMICITY_ENABLED =
|
...
key("table.cor-atomicity.enabled")
|
...
...
...
...
"Specifies if the create table as select operation is executed atomically. "
|
...
+ "By default, the operation is non-atomic. The target table is created in Client side, and it will not be dropped even though the job fails or is cancelled. "
|
...
+ "If set this option to true, the target table is created in JM side, it also will be dropped when the job fails or is cancelled.");
|
...
Implementation Plan
We provide two semantics for Flink CTAS: Non-atomic and Atomic. Non-atomic implementations are the default behavior of Streaming and Batch modes.
...