Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...


Vote thread
JIRA

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-26942

Release

...

1.

...

17


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

These two steps seem to be normal, but if there are many fields, spelling DDL statements can be difficult, and , the type mapping of the target table also is prone to errors, and write out these columns in the following insert statement. Therefore, we can support CTAS (CREATE TABLE AS SELECT) like MySQL, Oracle, Microsoft SQL Server, Hive, Spark, etc ... It will be more user friendly. In addition, the Hive dialect already has some support for CTAS. My suggestion would be to support a variation of an optional Feature T172, “AS subquery clause in table definition”, of SQL standard.

...

  • Flink: Hive dialect already supports CTAS but does not guarantee atomic(can not roll back) . ==> LEVEL-1
  • Spark DataSource v1: is atomic (can roll back), but is not isolated . ==> LEVEL-2
  • Spark DataSource v2: Guaranteed atomicity and isolation. ==> LEVEL-3
  • Hive MR: Guaranteed atomicity and isolation . ==> LEVEL-3

Hive SQL and Spark SQL are mainly used in offline(batch mode) scenarios; Flink SQL is suitable for both real-time(streaming mode) and offline(batch mode) scenarios. In a real-time scenario, we believe that the job is always running and does not stop, and the data is written in real time and visible in real time, so it is no need to provide atomicity.

To ensure that Flink SQL is semantically semantic consistent in Streaming streaming mode and Batch batch mode, combining the current situation of Flink and the needs of our business, choosing LEVEL-1 as the default behavior for Flink streaming and batch mode. If the user requires LEVEL-2 atomicity,  this ability can be achieved by enabling an the table.corctas.atomicity-atomicity.enabled option option. In general, batch mode usually requires LEVEL-2 atomicity. In a nutshell, Flink provides two level atomicity guarantee, LEVEL-2 1 as the default behavior.

Public API Changes

...

Code Block
languagesql
titlesyntax
CREATE TABLE ctas_hudi
 WITH ('connector.type' = 'hudi')
 AS SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;

...

Code Block
languagesql
titlesyntax
CREATE TABLE ctas_hudi
 (
 	id BIGINT,
 	name STRING,
 	age INT
 )
 WITH ('connector.type' = 'hudi');

INSERT INTO ctas_hudi SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;


Note: Regarding WITH option part in query of sink table, user must specify the 'connector' option and some required options corresponding to the specific connector as create table DDL way. If these options are not filled, the table to be created is recognized as a managed table, please see Managed Table part for details.

Public Interface

Table

Providing method that are used to execute CTAS(CREATE TABLE AS SELECT) for Table API user.

...

/**
* The table is used for the {@code Create Table As Select} syntax in Table API level. This interface
* is created by {@code Table#save} method, and provides some methods such as {@code option} and {@code create}
* method help to fill the table options and produce {@link TablePipeline}. Currently we only support
* {@code Create Table As Select} syntax, but t
he {@code Replace Table AS SELECT} and {@code Create Or Replace Table AS SELECT}
* syntax may be supported in the future.

*/
@PublicEvolving
public interface CreateOrReplaceTable {, so {@code replace} and {@code createOrReplace} method maybe will be introduced.
*/
@PublicEvolving
public interface CreateOrReplaceTable {

/**
* Adding table options to CreateOrReplaceTable.
*
* <p>Example:
*
* <pre>{@code
* tab.option("connector", "filesystem");
* }</pre>
*/
CreateOrReplaceTable option(String key, String value);

/**
* Create the table in the specified path and write the pipeline data to this table.
*
* <p>Example:
*
* <pre>{@code
* table.saveAs("my_ctas_table")
* .option("connector", "filesystem")
* .option("format", "testcsv")
* .option("path", "/tmp/my_ctas_table/")
* .create(true);
* }</pre>
*/
TablePipeline create(boolean ignoreIfExists);
/**
* Create the table under the specified path if not exist and write the pipeline data to this table.
*
* <p>Example:
*
* <pre>{@code

}

The CreateOrReplaceTable interface is introduced newly because if we add the create API in the Table interface, the user must call the saveAs API before calling these API, which will cause additional usage costs to the user. This API only support Create Table As Select syntax currently, but in the future, we maybe support Replace Table As Select and  Create Or Replace As Table syntax which is also supported by some other batch compute engine.

The recommended way to use CreateOrReplaceTable as following:

TablePipeline tablePipeline =* table.saveAs("my_ctas_table")
* .option("connector", "filesystem")
* .option("format", "testcsv")
* .option("path", "/tmp/my_ctas_table/")
* .createIfNotExistcreate(true);
* }</pre>
*/
TablePipeline createIfNotExisttablePipeline.execute();}

The CreateOrReplaceTable interface is introduced newly because if we add the create/createIfNotExist API in the Table interface, the user must call the saveAs API before calling these API, which will cause additional usage costs to the user. This API only support Create Table As Select syntax currently, but in the future, we maybe support Replace Table As Select and  Create Or Replace As Table syntax which is also supported by some other batch compute engine.

The recommended way to use CreateOrReplaceTable as following:

We save the properties set through the option API and set them in the CatalogBaseTable when executing the create API, so as to generate the DynamicTableSink.

TableConfigOptions

Add table.ctas.atomicity-enabled option to allow users to enable atomicity when using create table as select syntax.

@PublicEvolving
public class TableConfigOptions {
   @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMINGTablePipeline tablePipeline = table.saveAs("my_ctas_table")
public static final .option("connector", "filesystem")
ConfigOption<Boolean> TABLE_CTAS_ATOMICITY_ENABLED =
.option("format", "testcsv")
key("table.ctas.atomicity-enabled")
.option("path", "/tmp/my_ctas_table/")
.booleanType()
.createdefaultValue(false);
tablePipeline.execute();

We save the properties set through the option API and set them in the CatalogBaseTable when executing the create/createIfNotExist API, so as to generate the TableSink.

Catalog

Regarding Catalog interface, to support Create Table As Select syntax(atomicity guarantee, if don't need atomicity, the second option is no need), two changes are needed here:

  1. Providing a new method inferTableOptions that is used to infer the options of CatalogBaseTable, these options will be used to compile the sql to JobGraph successfully. This method throw UnsupportedOperationException default.
  2. This interface should extands the java Serializable interface, then it can be serialized as a part of JobGraph and pass to JM side, so this require the essential options to construct a catalog object can be serialized.

...

    /**
* This method is used to infer the default options for {@link CatalogBaseTable} through {@link Catalog} options to compile
* the sql successfully by planner when using the {@code Create Table As Select} syntax.
*
* Assuming an user want to select data from a kafka table and then insert the result to mysql table, if the mysql table is not existed in
* in physical mysql storage, user also doesn't want to create the table manually in mysql side because of complex type mapping.
* User can create the {@link JdbcCatalog} firstly which connect to the msyql instance, then use
* {@code CREATE TABLE `mysql`.`user_db`.`order_cnt` AS SELECT * FROM `KafkaTable`} syntax, it is convenient to load data from kafka
* to msyql. Due to the {@link JdbcCatalog} has provides user, password, url and other options, so user doesn't need to fill the
* option in the query. If user doesn't specify the target table required options, planner will call this method to fill the options to
* {@link CatalogBaseTable} which are need to compile sql by planner.

*
* <p>{@link JdbcCatalog} example:
* <pre>{@code
* // If the user does not set any Table's options,
* // then the implementation of JdbcCatalog#inferTableOptions
* // can be like this to avoid the execution failure.
* public CatalogBaseTable inferTableOptions(ObjectPath tablePath, CatalogTable table) {
* Map<String, String> tableOptions = table.getOptions();
* tableOptions.put("connector", "jdbc");
* tableOptions.put("url", getBaseUrl());
* tableOptions.put("table-name", tablePath.getObjectName());
* tableOptions.put("username", getUsername());
* tableOptions.put("password", getPassword());
* return table.copy(tableOptions);
* }
* }</pre>
*/
default CatalogBaseTable inferTableOptions(ObjectPath tablePath, CatalogTable table) {
throw new UnsupportedOperationException();
}

...

TableConfigOptions

Add table.cor-atomicity.enabled option to allow users to enable atomicity when using create table as select syntax.

...

@PublicEvolving
public class TableConfigOptions {
   @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
public static final ConfigOption<Boolean> TABLE_COR_ATOMICITY_ENABLED =
key("table.cor-atomicity.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Specifies if the create table as select operation is executed atomically. "
+ "By default, the operation is non-atomic. The target table is created in Client side, and it will not be dropped even though the job fails or is cancelled. "
+ "If set this option to true, the target table is created in JM side, it also will be dropped when the job fails or is cancelled.");
}

.withDescription(
"Specifies if the create table as select operation is executed atomically. "
+ "By default, the operation is non-atomic. The target table is created in Client side, and it will not be dropped even though the job fails or is cancelled. "
+ "If set this option to true, the target table is created in JM side, it also will be dropped when the job fails or is cancelled.");
}

Catalog

We will update Catalog's javadocs to add the following description:

If Catalog needs to support the atomicity feature of CTAS, then Catalog must implement Serializable and make the Catalog instances can be serializable/deserializable using Java serialization.
When atomicity support for CTAS is enabled, Planner will check if the Catalog instance can be serialized using the Java serialization.

Implementation Plan

We provide two semantics for Flink CTAS: Non-atomic and Atomic. Non-atomic implementations are the default behavior of Streaming and Batch modes. 

Non-atomic (default)

The overall execution process is shown in the following figure.

Image Added

The non-atomic implementation is basically the same as the existing Insert data process, except that the sink table is first created on the Client side via Catalog before performing the insert.

Compile the SQL, parse the schema of the sink table based on the query, then create the table, and finally submit the job to write data to the sink table. No need for too much introduction.

Atomic

The atomicity implementation of Flink CTAS requires two parts:

  • Enabling the atomicity option.
  • Catalog can be serialized(Catalog providers need to implement the Serializable interface of java and can be serialized/deserialized), ensuring atomicity by performing created/dropped table on the JM side.

 The following describes the process when the user enables the atomicity support option. The overall execution process is shown in the following figure.

Image Added

Due to the client process may exit soon, such

Implementation Plan

We provide two semantics for Flink CTAS: Non-atomic and Atomic. Non-atomic implementations are the default behavior of Streaming and Batch modes. 

Non-atomic (default)

The overall execution process is shown in the following figure.

Image Removed

The non-atomic implementation is basically the same as the existing Insert data process, except that the sink table is first created on the Client side via Catalog before performing the insert.

Compile the SQL, parse the schema of the sink table based on the query, then create the table, and finally submit the job to write data to the sink table. No need for too much introduction.

Atomic

The atomicity implementation of Flink CTAS requires two parts:

  • Enabling the atomicity option.
  • Catalog can be serialized, ensuring atomicity by performing created/dropped table on the JM side.

 The following describes the process when the user enables the atomicity support option. The overall execution process is shown in the following figure.

Image Removed

Due to the client process may exit soon, such as detached mode, and the job execution maybe take a longer time, so the table creation(before job start) and drop(when job failed or cancelled) should be executed in JM side instead of client. In addition, the creation and drop of table is executed through Catalog which is consistent with existing behavior. Therefore, a new Hook mechanism should be introduced which is needed by JM to execute the corresponding action,  the hook depend on Catalog to complete the function. In summary, the overall execution process of CTAS job is as following:

...

Providing method for planner to register JobStatusHook with StreamGraph.

public class StreamGraph implements Pipeline {
   
    private final List<JobStatusHook> jobStatusHooks = new ArrayList<>();
    ... ...
/** Registers the JobStatusHook. */ class StreamGraph implements Pipeline {

   
    voidprivate final List<JobStatusHook> jobStatusHooks = new ArrayList<>();

addJobStatusHook(JobStatusHook hook) {
... ...

    }
}

The final tasks of the job are all generated by Planner. We want to complete the create/drop table action through Hook on the JM side, so we need an API to register the Hook on the JM side.

Introducing the process of CTAS in Planner:

step1:

Compile SQL to generate CatalogBaseTable (The sink table to be created) and CreateTableASOperation.

step2:

Use Catalog#inferTableOptions method to filling the required options of CatalogBaseTable. The specific implementation is determined by the Catalog.

For example, when using JdbcCatalog, if the user does not fill any table options, JdbcCatalog will set connector to 'jdbc' and fill username, password and base-url.  When using HiveCatalog, if the user does not fill in any table options, HiveCatalog will set connector to 'hive'; User-implemented catalogs can also use this mechanism to fill some required options.

It should be noted that the InMemoryCatalog case, the tables saved in it all exist in the external system, so the table options have to be filled in manually by the user, the Catalog cannot infer it automatically. If the Catalog does not support ManagedTable and the user does not set the connector information, the execution will fail.

/** Registers the JobStatusHook. */
void registerJobStatusHook(JobStatusHook hook) {
...
}
}

The final tasks of the job are all generated by Planner. We want to complete the create/drop table action through Hook on the JM side, so we need an API to register the Hook on the JM side.

Introducing the process of CTAS in Planner:

step1:

Compile SQL to generate CatalogBaseTable (The sink table to be created) and CreateTableASOperation.

step2step3:

Using CatalogBaseTable and Catalog objects to construct JobStatusHook. Due to  the JobStatusHook is finally executed on the JM side, and the CatalogBaseTable needs to be created/dropped through the Catalog in hook, so Catalog and CatalogBaseTable are member variables of hook, which also need to be serialized and can be passed to JM.

step4step3:

Planner registers JobStatusHook with StreamGraph, then the JobStatusHook is also serialized and passed to JM through the serialization of JobGraph. 

For CatalogBaseTable, we use CatalogPropertiesUtil to serialize/deserialize it , it's the tools that Flink already provides.s the tools that Flink already provides.

For Catalog, Catalog providers need to implement the Serializable interface of java, so that it can be serialized/deserialized directly. The planner will attempt to pre-serialize the Catalog, and if the serialization fails, an exception will be thrown indicating to the user that the Catalog cannot be serialized does not support atomicity semantics.For Catalog, we need Catalog to extends Serializable so that it can be serialized directly. 

The purpose of doing so is:

  1. By this way, the Catalog registered via DDL and Table API both can be serialized.
  2. Reduce the cost of user-defined catalogs without much consideration for serialization (If the CREATE TABLE AS SELECT (CTAS) syntax is supported, the catalog must be serializable).

Key Points for Catalog Support Serializabilityserializability to support atomic semantic:

Built-in Catalog:

  • InMemoryCatalog: Due to the CatalogDatabase and CatalogBaseTable etc can't be serialized by java serialization mechanism directly, so the InMemoryCatalog doesn't support to serialize which means it can not support atomic semantic.
  • JdbcCatalog: The required member variables to construct Catalog object are directly serializable, such as username, password, base url, etc. The JdbcDialectTypeMapper interface need extends the serializable.
  • HiveCatalog: All member variables can be serialized directly, except for the HiveConf object, which cannot be serialized directly. We can refer to JobConfWrapper to solve the serialization problem of HiveConf.

User-defined Catalog:

  • User-defined catalog that require support for CREATE TABLE AS SELECT (CTAS) syntax should ensure that the catalog can be serialized.

Create Table As Select(CTAS) feature depends on the serializability of the catalog. To quickly see if the catalog supports CTAS, we need to try to serialize the catalog when compile SQL in planner and if it fails, an exception will be thrown to indicate to user that the catalog does not support CTAS because it cannot be serialized.

...

  • , so this Catalog can implement the Serializable interface.
  • HiveCatalog: All member variables can be serialized directly, except for the HiveConf object, which cannot be serialized directly. We can refer to JobConfWrapper to solve the serialization problem of HiveConf, so this catalog also can implement the Serializable interface.

User-defined Catalog:

  • User-defined catalog that require support for CREATE TABLE AS SELECT (CTAS) syntax should  implement Serializable interface to support atomic semantic.

Runtime

Provide JM side, job status change hook mechanism.

...

  1. When the job starting, the JobGraph will be deserialized, and then the JobStatusHook can also be deserialized.
  2. When deserializing JobStatusHook, Catalog and CatalogBaseTable will also be deserialized.
    • Deserialize CatalogBaseTable using CatalogPropertiesUtil#deserializeCatalogTable method.
    • After deserializing the Catalog, call Catalog#open in the JobStatusHook#onCreated method to ensure that the Catalog can be usedis working.
  3. When the job is start and the job status changes, the JobStatusHook method will be called by JM:

For example, our JobStatusHook implementation is called CTASJobStatusHook, and use JdbcCatalog, JdbcCatalog it is serialized by Planner has been covered in the previous section and will not be repeated.

Then when the job status changes, the CTASJobStatusHook method can be called:method can be called:

  • When the job status is CREATED, the runtime module will call the CTASJobStatusHook#onCreated method, which will call the JdbcCatalog#createTable method to create a table.
  • When the final status of the job is FAILEDWhen the job status is CREATED, the runtime module will call the CTASJobStatusHook#onCreated CTASJobStatusHook#onFailed method, which will call the JdbcCatalog#createTable JdbcCatalog#dropTable method to create a drop table.
  • When the final status of the job is FAILEDCANCELED, the runtime module will call the CTASJobStatusHook#onFailed CTASJobStatusHook#onCanceled method, which will call the JdbcCatalog#dropTable method to drop table.
  • When the final status of the job is CANCELED, the runtime module will call the CTASJobStatusHook#onCanceled method, which will call the JdbcCatalog#dropTable method to drop table.
  • When the final status of the job is FINISH, the runtime module will call the CTASJobStatusHook#onFinished method, and we do not need to do any additional operations.

Data Visibility

Regarding data visibility, it is determined by the TableSink and runtime-mode:

Stream mode:

If the external storage system supports transactions or two-phase commit, then data visibility is bound to the Checkpoint cycle. Otherwise, data is visible immediately after writing, which is consistent with the current flink behavior.

Batch mode:

  • FileSystem Sink: Data should be written to the temporary directory first, visible after the final job is successful(final visibility).
  • Two-phase commit Sink:  Data visible after the final job is successful(final visibility).
  • Supports transaction Sink:  Commit transactions after the final job is successful(final visibility). Commit transactions periodically or with a fixed number of records(incremental visibility).
  • Other Sink:  Data is visible immediately after writing(write-visible).

Managed Table

For Managed Table, please refer to FLIP-188 . Table options that do not contain the ‘connector’ key and value represent a managed table. CTAS also follows this principle. For details, please refer to the Table Store docs: https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/create-table.

...

  • table.
  • When the final status of the job is FINISH, the runtime module will call the CTASJobStatusHook#onFinished method, and we do not need to do any additional operations.

Managed Table

Since FLIP-188, Flink have introduced managed table. If the specific Catalog support managed table and the table to be created through the Catalog doesn't fill the 'connector' option,  that table will be recognized as a managed table. Create Table As Select syntax also supports to create managed table, so if the target table in CTAS syntax doesn't fill any options, it will be seen as managed table. If you don't want to create managed table, you must fill 'connector' option and some required options corresponding the specific connector of catalog.

For managed table details, please refer to the Table Store docs: https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/create-table.

Data Visibility

Regarding data visibility, it is determined by the TableSink and runtime-mode:

Stream mode:

If the external storage system supports transactions or two-phase commit, then data visibility is bound to the Checkpoint cycle. Otherwise, data is visible immediately after writing, which is consistent with the current flink behavior.

Batch mode:

  • FileSystem Sink: Data should be written to the temporary directory first, visible after the final job is successful(final visibility).
  • Two-phase commit Sink:  Data visible after the final job is successful(final visibility).
  • Supports transaction Sink:  Commit transactions after the final job is successful(final visibility). Commit transactions periodically or with a fixed number of records(incremental visibility).
  • Other Sink:  Data is visible immediately after writing(write-visible).

Compatibility, Deprecation, and Migration Plan

...