Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...


Vote thread
JIRA

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-26942

...

Release1.

...

17


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The current syntax/features of Flink SQL is very perfect in both stream mode and batch mode. But there are still some usability to improve. for For example, If the user wants to insert data into a new table, 2 steps are required:

  • First, prepare the DDL statement of the table named t1;
  • Second, insert the data into t1;

These two steps seem to be normal, but if there are many fields, spelling DDL statements can be difficult, the type mapping of the target table also is prone to errors, and write out these columns in the following insert statement. Therefore, we can support CTAS (CREATE TABLE AS SELECT) like MySQL, Oracle, Microsoft SQL Server, Hive, Spark, etc ... It will be more user friendly. In addition, the Hive dialect already has some support for CTAS.

Public API Changes

Syntax

I suggest introducing a CTAS clause with a following syntax:

Code Block
languagesql
titlesyntax
CREATE TABLE [ IF NOT EXISTS ] table_name 
[ WITH ( table_properties ) ]
[ AS query_expression ]

Example:

Code Block
languagesql
titlesyntax
CREATE TABLE ctas_hudi
 WITH ('connector.type' = 'hudi')
 AS SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;

Resulting table equivalent to:

Code Block
languagesql
titlesyntax
CREATE TABLE ctas_hudi
 (
 	id BIGINT,
 	name STRING,
 	age INT
 )
 WITH ('connector.type' = 'hudi');

INSERT INTO ctas_hudi SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;

Table

Providing method that are used to execute CTAS for Table API user.

...

    /**
* Declare the pipeline defined by the given {@link Table} object
* to create the table at the specified path.
*/
CreatingTable saveAs(String tablePath);
}

My suggestion would be to support a variation of an optional Feature T172, “AS subquery clause in table definition”, of SQL standard.

Through the appendix research summary and analysis, the current status of CREAE TABLE AS SELECT(CTAS) in the field of big data is:

  • Flink: Hive dialect already supports CTAS but does not guarantee atomic(can not roll back) ==> LEVEL-1
  • Spark DataSource v1: is atomic (can roll back), but is not isolated ==> LEVEL-2
  • Spark DataSource v2: Guaranteed atomicity and isolation. ==> LEVEL-3
  • Hive MR: Guaranteed atomicity and isolation ==> LEVEL-3

Hive SQL and Spark SQL are mainly used in offline(batch mode) scenarios; Flink SQL is suitable for both real-time(streaming mode) and offline(batch mode) scenarios. In a real-time scenario, we believe that the job is always running and does not stop, and the data is written in real time and visible in real time, so it is no need to provide atomicity.

To ensure that Flink SQL is semantic consistent in streaming mode and batch mode, combining the current situation of Flink and the needs of our business, choosing LEVEL-1 as the default behavior for Flink streaming and batch mode. If the user requires LEVEL-2 atomicity,  this ability can be achieved by enabling the table.ctas.atomicity-enabled option. In general, batch mode usually requires LEVEL-2 atomicity. In a nutshell, Flink provides two level atomicity guarantee, LEVEL-1 as the default behavior.

Public API Changes

Syntax

We proposing the CREATE TABLE AS SELECT(CTAS) clause as following:


Code Block
languagesql
titlesyntax
CREATE TABLE [ IF NOT EXISTS ] table_name 
[ WITH ( table_properties ) ]
[ AS query_expression ]

The optional AS query_expression is we new propose change which is the SELECT query expression.


Assuming the following CTAS query:

Code Block
languagesql
titlesyntax
CREATE TABLE ctas_hudi
 WITH ('connector' = 'hudi')
 AS SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;


Resulting table equivalent to:

Code Block
languagesql
titlesyntax
CREATE TABLE ctas_hudi
 (
 	id BIGINT,
 	name STRING,
 	age INT
 )
 WITH ('connector' = 'hudi');

INSERT INTO ctas_hudi SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;


Note: Regarding WITH option part in query of sink table, user must specify the 'connector' option and some required options corresponding to the specific connector as create table DDL way. If these options are not filled, the table to be created is recognized as a managed table, please see Managed Table part for details.

Public Interface

Table

Providing method that are used to execute CTAS(CREATE TABLE AS SELECT) for Table API user.

@PublicEvolving
public interface Table extends Explainable<Table>, Executable {

    

CreatingTable

Add CreatingTable to use CTAS for table API.

/** A table that need to be created for CTAS. */
@PublicEvolving
public interface CreatingTable {
/**
* add table option.
* such as option("connector", "filesystem");
*/
CreatingTable option(String key, String value);
/**
* Create the table under the specified path
* and write the pipeline data to this table.
*/
TablePipeline create();
/**
*
Create
Declare the 
table
pipeline defined 
under
by the 
specified
given 
path
{@link 
if not exist
Table} object
*
and
to 
write
create the 
pipeline
table 
data
at 
to
the 
this
specified 
table
path.
*/
    CreateOrRepalceTable
TablePipeline createIfNotExist(
saveAs(String tablePath);
}

Catalog

Providing method that are used to serialize/deserialize Catalog and infer the option of CatalogBaseTable.

CreateOrReplaceTable

Proposing a public interface CreateOrReplaceTable used by CTAS(CREATE TABLE AS SELECT) for table API user.

@PublicEvolving
public interface Catalog {
/**
* WhenThe thetable CTASis functionused wasfor used,the the{@code CatalogCreate combinedTable {@linkAs Catalog#supportsManagedTableSelect}
syntax in Table *API tolevel. inferThis whetherinterface to
add* someis optionscreated to theby {@link@code CatalogBaseTableTable#save}.
method, and provides *
some methods such as *{@code For example:
option} and {@code create}
* {@linkmethod JdbcCatalog}help canto addfill user,the password,table urloptions and other options to {produce {@link CatalogBaseTableTablePipeline}.
Currently we only support */
default void inferTableOptions(CatalogBaseTable table) {}
/**
* Assist in the serialization of different catalogs.
* Catalog decides for itself which information needs to be serialized.
*/
default void serialize(OutputStream output) throws IOException {}

* {@code Create Table As Select} syntax, but t
he {@code Replace Table AS SELECT} and {@code Create Or Replace Table AS SELECT}
* syntax may be supported in the future, so {@code replace} and {@code createOrReplace} method maybe will be introduced.

*/
@PublicEvolving
public interface CreateOrReplaceTable {

/**
* AssistAdding intable deserializationoptions ofto different catalogsCreateOrReplaceTable.
*
Determine whether to create a new Catalog instance based on* <p>Example:
*
* the<pre>{@code
deserialized data and the implementation of the catalog.
* tab.option("connector", "filesystem");
* }</pre>
default*/
Catalog deserialize(InputStream input) throws ClassNotFoundException, IOException { CreateOrReplaceTable option(String key, String value);

/**
return this; * Create the table in the specified path and write the pipeline data to this table.
}
}

Proposed Changes

First of all, make it clear, CTAS command create table must go through catalog.

Implementation Plan

Through the research summary and analysis, the current status of CTAS in the field of big data is:

  • Flink: Flink dialect does not support CTAS. ==> LEVEL-1
  • Spark DataSource v1: is atomic (can roll back), but is not isolated. ==> LEVEL-2
  • Spark DataSource v2: Guaranteed atomicity and isolation. ==> LEVEL-3
  • Hive MR: Guaranteed atomicity and isolation. ==> LEVEL-3

Combining the current situation of Flink and the needs of our business, choosing a Level-2 implementation for Flink.

Execution Flow

Image Removed

The overall execution process is shown in the figure above.

Because the client process may exit soon, such as detached mode, choose to create/drop the table on the JM side.

Detailed execution process

Steps:

1) Compile SQL to generate CatalogBaseTable (The table to be created) and CreateTableASOperation.

2) Use Catalog#inferTableOption API to do options filling to CatalogBaseTable.  The specific implementation is determined by the Catalog.

If the Catalog does not support ManagedTable and the user does not set the connector information, the execution will fail.

For example, when using JDBCCatalog, if the user does not fill in any table options, JDBCCatalog can set connector to 'jdbc' and fill username, password and base-url.

3) Use CatalogBaseTable and Catalog objects to construct JobStatusHook so that Catalog can be used to create/drop the table when JobStatusHook executes.

4) Planner registers JobStatusHook with StreamGraph, then the JobStatusHook is also serialized and passed to JM through the serialization of JobGraph.

Because JobGraph uses ObjectOutputStream serialization, since CatalogBaseTable and Catalog cannot be serialized directly using ObjectOutputStream, JobStatusHook instances need to implement interface Externalizable.

Internally use CatalogPropertiesUtil to serialize/deserialize CatalogBaseTable.

For Catalog, we have added serialize and deserialize APIs, and the Catalog implements its own properties that need to be serialized.

We save the classname of the Catalog together with the serialized content, like this:

...

Since the Catalog class may not have a parameterless constructor, so we can't use Class.newInstance to initialize an object, we can use the framework objenesis to solve.

After using objenesis to get the Catalog object (an empty Catalog instance), get the real Catalog instance through the Catalog#deserialize API.

This solves the serialization/deserialization problem of CatalogBaseTable and Catalog.

5) Submit the job to the cluster for execution. 

  1. Create sink table(stage table) through Catalog's new API createStageTable.
  2. Construct CTASJobStatusHook with Catalog as a construction parameter, CTASJobStatusHook is an implementation of the JobStatusHook interface.
  3. Register CTASJobStatusHook with StreamGraph, then passed to JobGraph and serialized(Need to implement serialization/deserialization of Catalog and JobStatusHook).
  4. When the job starts and the status is CREATED, the runtime module will call the JobStatusHook#onCreated method, and we call the Catalog#createTable method in the CTASJobStatusHook#onCreated method.
  5. When the final status of the job is FAILED, the runtime module will call the JobStatusHook#onFailed method, we call the Catalog#dropTable method in the CTASJobStatusHook#onFailed method.
  6. When the final status of the job is CANCELED, the runtime module will call the JobStatusHook#onCanceled method, we call the Catalog#dropTable method in the CTASJobStatusHook#onCanceled method.
  7. When the final status of the job is FINISH, the runtime module will call the JobStatusHook#onFinished method, and we do not need to do any additional operations.

Hook design

Runtime

...

 *
* <p>Example:
*
* <pre>{@code
* table.saveAs("my_ctas_table")
* .option("connector", "filesystem")
* .option("format", "testcsv")
* .option("path", "/tmp/my_ctas_table/")
* .create(true);
* }</pre>
*/
TablePipeline create(boolean ignoreIfExists);

}

The CreateOrReplaceTable interface is introduced newly because if we add the create API in the Table interface, the user must call the saveAs API before calling these API, which will cause additional usage costs to the user. This API only support Create Table As Select syntax currently, but in the future, we maybe support Replace Table As Select and  Create Or Replace As Table syntax which is also supported by some other batch compute engine.

The recommended way to use CreateOrReplaceTable as following:

TablePipeline tablePipeline = table.saveAs("my_ctas_table")
.option("connector", "filesystem")
.option("format", "testcsv")
.option("path", "/tmp/my_ctas_table/")
.create(true);
tablePipeline.execute();

We save the properties set through the option API and set them in the CatalogBaseTable when executing the create API, so as to generate the DynamicTableSink.

TableConfigOptions

Add table.ctas.atomicity-enabled option to allow users to enable atomicity when using create table as select syntax.

@PublicEvolving
public class TableConfigOptions {
   @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
public static final ConfigOption<Boolean> TABLE_CTAS_ATOMICITY_ENABLED =
key("table.ctas.atomicity-enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Specifies if the create table as select operation is executed atomically. "
+ "By default, the operation is non-atomic. The target table is created in Client side, and it will not be dropped even though the job fails or is cancelled. "
+ "If set this option to true, the target table is created in JM side, it also will be dropped when the job fails or is cancelled.");
}

Catalog

We will update Catalog's javadocs to add the following description:

If Catalog needs to support the atomicity feature of CTAS, then Catalog must implement Serializable and make the Catalog instances can be serializable/deserializable using Java serialization.
When atomicity support for CTAS is enabled, Planner will check if the Catalog instance can be serialized using the Java serialization.

Implementation Plan

We provide two semantics for Flink CTAS: Non-atomic and Atomic. Non-atomic implementations are the default behavior of Streaming and Batch modes. 

Non-atomic (default)

The overall execution process is shown in the following figure.

Image Added

The non-atomic implementation is basically the same as the existing Insert data process, except that the sink table is first created on the Client side via Catalog before performing the insert.

Compile the SQL, parse the schema of the sink table based on the query, then create the table, and finally submit the job to write data to the sink table. No need for too much introduction.

Atomic

The atomicity implementation of Flink CTAS requires two parts:

  • Enabling the atomicity option.
  • Catalog can be serialized(Catalog providers need to implement the Serializable interface of java and can be serialized/deserialized), ensuring atomicity by performing created/dropped table on the JM side.

 The following describes the process when the user enables the atomicity support option. The overall execution process is shown in the following figure.

Image Added

Due to the client process may exit soon, such as detached mode, and the job execution maybe take a longer time, so the table creation(before job start) and drop(when job failed or cancelled) should be executed in JM side instead of client. In addition, the creation and drop of table is executed through Catalog which is consistent with existing behavior. Therefore, a new Hook mechanism should be introduced which is needed by JM to execute the corresponding action,  the hook depend on Catalog to complete the function. In summary, the overall execution process of CTAS job is as following:

  1. Flink Client compiles SQL and generates an execution plan, In this process, the Hook that needs to be executed on the JM side is generated, the Catalog and CatalogBaseTable are the member variable of Hook which should be serializable .
  2. Submit the job to the cluster through the client.
  3. When the job starts, construct the hook object,  deserialize the Catalog and CatalogBaseTable in hook. Then call the Catalog#createTable method through the hook to create the CatalogBaseTable.
  4. Task start to execute.
  5. If the final status of the job is failed or canceled, the created CatalogBaseTable needs to be dropped by the hook to call the Catalog#dropTable method.

The next describes the details of the implementation.

Planner

Providing method for planner to register JobStatusHook with StreamGraph.

public class StreamGraph implements Pipeline {

   
    private final List<JobStatusHook> jobStatusHooks = new ArrayList<>();

... ...

    /** Registers the JobStatusHook. */
void registerJobStatusHook(JobStatusHook hook) {
...
}
}

The final tasks of the job are all generated by Planner. We want to complete the create/drop table action through Hook on the JM side, so we need an API to register the Hook on the JM side.

Introducing the process of CTAS in Planner:

step1:

Compile SQL to generate CatalogBaseTable (The sink table to be created) and CreateTableASOperation.

step2:

Using CatalogBaseTable and Catalog objects to construct JobStatusHook. Due to  the JobStatusHook is finally executed on the JM side, and the CatalogBaseTable needs to be created/dropped through the Catalog in hook, so Catalog and CatalogBaseTable are member variables of hook, which also need to be serialized and can be passed to JM.

step3:

Planner registers JobStatusHook with StreamGraph, then the JobStatusHook is also serialized and passed to JM through the serialization of JobGraph. 

For CatalogBaseTable, we use CatalogPropertiesUtil to serialize/deserialize it , it's the tools that Flink already provides.

For Catalog, Catalog providers need to implement the Serializable interface of java, so that it can be serialized/deserialized directly. The planner will attempt to pre-serialize the Catalog, and if the serialization fails, an exception will be thrown indicating to the user that the Catalog cannot be serialized does not support atomicity semantics.

The purpose of doing so is:

  1. By this way, the Catalog registered via DDL and Table API both can be serialized.
  2. Reduce the cost of user-defined catalogs without much consideration for serialization (If the CREATE TABLE AS SELECT (CTAS) syntax is supported, the catalog must be serializable).

Key Points for Catalog serializability to support atomic semantic:

Built-in Catalog:

  • InMemoryCatalog: Due to the CatalogDatabase and CatalogBaseTable etc can't be serialized by java serialization mechanism directly, so the InMemoryCatalog doesn't support to serialize which means it can not support atomic semantic.
  • JdbcCatalog: The required member variables to construct Catalog object are directly serializable, such as username, password, base url, etc. The JdbcDialectTypeMapper interface need extends the serializable, so this Catalog can implement the Serializable interface.
  • HiveCatalog: All member variables can be serialized directly, except for the HiveConf object, which cannot be serialized directly. We can refer to JobConfWrapper to solve the serialization problem of HiveConf, so this catalog also can implement the Serializable interface.

User-defined Catalog:

  • User-defined catalog that require support for CREATE TABLE AS SELECT (CTAS) syntax should  implement Serializable interface to support atomic semantic.

Runtime

Provide JM side, job status change hook mechanism.

/**
* Hooks provided by users on job status changing.
*/
@Internal
public interface JobStatusHook extends Serializable {

/** When Job become CREATED status. It would only be called one time. */
void onCreated(JobID jobId);

/** When job finished successfully. */
void onFinished(JobID jobId);

/** When job failed finally. */
void onFailed(JobID jobId, Throwable throwable);

/** When job get canceled by users. */
void onCanceled(JobID jobId);
}

Flink's current Hook design cannot meet the needs of CTAS. For example, the JobListener is executed on the client side; JobStatusListener is on the JM side, but it cannot be serialized and it function cannot meets CTAS. Thus we tend to propose a new interface which is called JobStatusHook, that could be attached to the JobGraph and executed in the JobMaster. The interface will also be marked as Internal. 

The process of CTAS in runtime

  1. When the job starting, the JobGraph will be deserialized, and then the JobStatusHook can also be deserialized.
  2. When deserializing JobStatusHook, Catalog and CatalogBaseTable will also be deserialized.
    • Deserialize CatalogBaseTable using CatalogPropertiesUtil#deserializeCatalogTable method.
    • After deserializing the Catalog, call Catalog#open in the JobStatusHook#onCreated method to ensure that Catalog is working.
  3. When the job is start and the job status changes, the JobStatusHook method will be called by JM:

For example, our JobStatusHook implementation is called CTASJobStatusHook, and use JdbcCatalog, it is serialized by Planner has been covered in the previous section and will not be repeated.

Then when the job status changes, the CTASJobStatusHook method can be called:

  • When the job status is CREATED, the runtime module will call the CTASJobStatusHook#onCreated method, which will call the JdbcCatalog#createTable method to create a table.
  • When the final status of the job is FAILED, the runtime module will call the CTASJobStatusHook#onFailed method, which will call the JdbcCatalog#dropTable method to drop table.
  • When the final status of the job is CANCELED, the runtime module will call the CTASJobStatusHook#onCanceled method, which will call the JdbcCatalog#dropTable method to drop table.
  • When the final status of the job is FINISH, the runtime module will call the CTASJobStatusHook#onFinished method, and we do not need to do any additional operations.

Managed Table

Since FLIP-188, Flink have introduced managed table. If the specific Catalog support managed table and the table to be created through the Catalog doesn't fill the 'connector' option,  that table will be recognized as a managed table. Create Table As Select syntax also supports to create managed table, so if the target table in CTAS syntax doesn't fill any options, it will be seen as managed table. If you don't want to create managed table, you must fill 'connector' option and some required options corresponding the specific connector of catalog.

For managed table details, please refer to the Table Store docs: https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/create-table.

Data Visibility

Regarding data visibility, it is determined by the TableSink and runtime-mode:

Stream mode:

If the external storage system supports transactions or two-phase commit, then data visibility is bound to the Checkpoint cycle. Otherwise, data is visible immediately after writing, which is consistent with the current flink behavior.

Batch mode:

  • FileSystem Sink: Data should be written to the temporary directory first, visible after the final job is successful(final visibility).
  • Two-phase commit Sink:  Data visible after the final job is successful(final visibility).
  • Supports transaction Sink:  Commit transactions after the final job is successful(final visibility). Commit transactions periodically or with a fixed number of records(incremental visibility).
  • Other Sink:  Data is visible immediately after writing(write-visible).

Compatibility, Deprecation, and Migration Plan

It is a new feature with no implication for backwards compatibility.

Test Plan

changes will be verified by UT

Rejected Alternatives

Catalog serialize

Option 1: Add serialize/deserialize API to catalog

If we added serialize and deserialize APIs, Catalog must implement serialization and deserialization itself. We save the class name of Catalog together with the serialized content, like this:

Catalog ClassName
Catalog serialized data

Since the Catalog class may not have a parameterless constructor, so we can't use Class#newInstance to initialize an object, we can use the framework objenesis to solve. After using objenesis to get the Catalog object (an empty Catalog instance), get the real Catalog instance through the Catalog#deserialize API. This solves the serialization/deserialization problem of Catalog.

For example, JdbcCatalog#serialize can save catalogName, defaultDatabase, username, pwd, baseUrl, and JdbcCatalog#deserialize can re-initialize a JdbcCatalog object through these parameters; HiveCatalog#serialize can save catalogName, defaultDatabase, hiveConf, hiveVersion, and HiveCatalog#deserialize can re-initialize a HiveCatalog object through these parameters; InMemoryCatalog#serialize only needs to save the catalogName and defaultDatabase, and InMemoryCatalog#deserialize can re-initialize an InMemoryCatalog object through these two parameters.

The tables in the InMemoryCatalog already exist in the external system. The metadata information held in the InMemoryCatalog is only used by the job itself, and is held only in memory. Therefore, all metadata information in the InMemoryCatalog does not need to be serialized and passed to JM. In JM, only need to initialize a new InMemoryCatalog.

The solution serialization tool is more complex to implement, and the user-defined Catalog is more expensive to implement, so it is abandoned.

Option 2: Serialize the options in the Create Catalog DDL

We need to serialize catalog name and the options which are used in catalog DDL, then JM side can use these options to re-initialize the catalog by flink ServiceLoader mechanism(Using FactoryUtil#createCatalog to get catalog).  To InMemoryCatalog, here are some special case. Due to the tables in InMemoryCatalog already exist in the external system, metadata information in InMemoryCatalog is only used by the job itself and is only stored in memory. The database related information in InMemoryCatalog needs to be serialized and then passed to JM, otherwise the database may not exist when JM creates the table. Other objects do not need to be serialized.

Here we give an example about catalog serializable process that catalog is created by DDL way.

CREATE CATALOG my_catalog WITH(
'type' = 'jdbc',
'default-database' = '...',
'username' = '...',
'password' = '...',
'base-url' = '...'
);

1) The Planner registers the catalog to the CatalogManager, it also registers the properties in the with keyword to the CatalogManager.

2) When serializing the catalog, only need to serialize and save the catalog name(my_catalog) and properties, like this:

my_catalog

{'type'='jdbc', 'default-database'='...', 'username'='...', 'password'='...', 'base-url'='...'}

The advantages of this solution are simple design, ease of compatibility and reduced complexity of implementation for the user, and does not require complex serialization and deserialization tools.

The disadvantage of this solution is that it does not cover the TableEnvironment#registerCatalog usage scenario, and database related information is missing in the InMemoryCatalog scenario.

Note:  This solution only works if we create the Catalog using DDL, because we can only get the Catalog properties with the with keyword. If we use a Catalog registered by TableEnvironment#registerCatalog method, we cannot get these properties. Therefore, CTAS does not currently support jobs that use TableEnvironment#registerCatalog.


The following issues require attention:

  1. HiveCatalog:  
    1. If hive-conf-dir is specified, since the configuration of hive-conf-dir is a local path, please make sure that all nodes in the cluster put the hive configuration file under the same path, otherwise JM will not find the file and FAILED. This problem also exists in the current application mode of Flink.
    2. If hive-conf-dir is not specified, then HiveCatalog will look for hive-site.xml from Java Classpath, then we have to solve the hive-site.xml upload problem and make sure that all modes in Flink Client and JM Classpath can find Otherwise the job will fail.

Planner

...

    /** Registers the JobStatusHook. */
void addJobStatusHook(JobStatusHook hook) {
...
}
}

Supported Job Mode

Support both streaming and batch mode.

The execution flow of streaming and batch is similar, the main differences are in atomicity and data visibility

Streaming

Since streaming job are long-running, usually data is to be consumed downstream in real time. Determined by the specific Sink implementation.

  • Data is visible after Checkpoint is success or visible immediately after writing.
  • In stream semantics, the data is as continuous as possible, strict atomicity is not guaranteed. Therefore, when the job fails, there is a high probability that the sink does not need to drop the table.

Batch

The batch job will end with disabled checkpoint, so we want the data to be visible after the job is success, and drop the table if the job fails.

Some external storage systems cannot be supported, such as Redis.

We will refer to spark DataSource v1 implementation.

  • Provides atomic capabilities, if the job fails, drop the table.(Requires runtime module support, when the job finally fails, notify the sink to clean up. )
  • Data visibility depends on the specific external storage, and can be divided into write-visible, final visibility, and incremental visibility. (Described in the Data Visibility section)

Drop the table if the job fails requires some additional support(both Streaming and Batch):

  • TableSink needs to provide the CleanUp API, developers implement as needed. Do nothing by default. If an exception occurs, can use this API to drop table or delete the temporary directory, etc.

Precautions

when need drop table:

  1. User manually cancel the job.
  2. Job final FAILED status, such as after exceeds the maximum number of task Failovers.

Drop table and TableSink are strongly bound:

Do not do drop table operations in the framework, drop table is implemented in TableSink according to the needs of specific TableSink, because the operations of different sinks is different.

For example, in HiveTableSink, we need to delete the temporary directory and drop the metadata in the Metastore, but FileSystemTableSink only need to delete the temporary directory,

it is also possible that no operations is required.

Atomicity & Data Visibility

Atomicity

CTAS does not provide strict atomicity, we will create the table first, the final atomicity is determined by the cleanUp implementation of TableSink.

This requires runtime module support, like the description in the Execution Flow.

Data Visibility

Regarding data visibility, it is determined by the TableSink and runtime-mode:

Stream mode:

If the external storage system supports transactions or two-phase commit, then data visibility is related to the Checkpoint cycle,

otherwise, data is visible immediately after writing, which is consistent with the current flink behavior.

Batch mode:
  • FileSystem Sink: Data should be written to the temporary directory first, visible after the final job is successful(final visibility).
  • Two-phase commit Sink:  Data visible after the final job is successful(final visibility).
  • Supports transaction Sink:  Commit transactions after the final job is successful(final visibility). Commit transactions periodically or with a fixed number of records(incremental visibility).
  • Other Sink:  Data is visible immediately after writing(write-visible).

Catalog

We can think that there are two types of catalogs in Flink, in-memory catalogs and external catalogs:

In-memory catalog:

  1. Metadata is a copy of the metadata of the external system, and the user ensures that the entity exists in the external system and the metadata is consistency, otherwise, throw exception when running. CTAS need create table first, so it is hard to ensures that the entity exists in the external system and the metadata is consistency.
  2. The user needs to configure the parameters of the external system through the with syntax, and Flink cannot obtain it through the in-memory directory.

Such as kafka table, we need the user to tell us the address of the kafka server, the name of the topic, and the data serialization format, otherwise flink job will failed.

External catalog:

  1. Metadata directly refers to external systems, and there is no consistency problem. Create table also directly calls the external system, so it is naturally guaranteed that the entity exists in the external system.
  2. The with syntax parameter is optional, Flink can obtain the necessary parameters through the external catalog.

Such as hive table, we can obtain the table information required by the Flink engine through HiveCatalog. 

Both in-memory catalog and external catalog will support CTAS, if the CTAS command is executed in the in-memory catalog and the target store does not exist in the external system, the Flink job will fail, which is consistent with the current flink behavior.

In-memory Catalog,the options of the table are completely dependent on user input.

Managed Table

For Managed Table, please refer to FLIP-188 . Table options that do not contain the ‘connector’ key and value represent a managed table. CTAS also follows this principle.

For details, please refer to the Table Store docs: https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/create-table/

CTAS supports Managed Table and Non-Managed Table.

Users need to be clear about their business needs and set the table options correctly.

Compatibility, Deprecation, and Migration Plan

It is a new feature with no implication for backwards compatibility.

Test Plan

changes will be verified by UT

Rejected Alternatives

N/A

References

  1. Support SELECT clause in CREATE TABLE(CTAS)
  2. MySQL CTAS syntax
  3. Microsoft Azure Synapse CTAS
  4. LanguageManual DDL#Create/Drop/ReloadFunction
  5. Spark Create Table Syntax

Appendix

Program research

I have investigated other bigdata big data engine implementations such as hive, spark:

...

 Non-atomic implementation is consistent with DataSource v1 logic. For details, see CreateTableAsSelectExec .

Atomic

Atomic implementation( for details, see AtomicCreateTableAsSelectExec), supported by StagingTableCatalog and StagedTable .

...