Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

It will be more user friendly. In addition, the Hive dialect already has some support for CTAS.

...

Public API Changes

...

First of all, make it clear, CTAS command create table must go through catalog.

Implementation Plan

We will introduce new concepts: Stage Table.

Stage Table: Created through the Catalog#createStageTable API, stored in the memory of the Catalog, visible in the SQL compilation stage;

There will be a collection record stage table in the catalog, which is created in the catalog backend storage when the job status is CREATED;

Dropped from the catalog backend storage when the job status is FAILED or CANCELED.

Through the research summary and analysis, the overall implementation process is as follows:

Execution Flow

Image Removed

The overall execution process is shown in the figure above.

Detailed execution process

Steps:

  1. Create sink table(stage table) through Catalog's new API createStageTable.
  2. Construct CTASJobStatusHook with Catalog as a construction parameter, CTASJobStatusHook is an implementation of the JobStatusHook interface.
  3. Register CTASJobStatusHook with StreamGraph, then passed to JobGraph and serialized(Need to implement serialization/deserialization of Catalog and JobStatusHook).
  4. When the job starts and the status is CREATED, the runtime module will call the JobStatusHook#onCreated method, and we call the Catalog#createTable method in the CTASJobStatusHook#onCreated method.
  5. When the final status of the job is FAILED, the runtime module will call the JobStatusHook#onFailed method, we call the Catalog#dropTable method in the CTASJobStatusHook#onFailed method.
  6. When the final status of the job is CANCELED, the runtime module will call the JobStatusHook#onCanceled method, we call the Catalog#dropTable method in the CTASJobStatusHook#onCanceled method.
  7. When the final status of the job is FINISH, the runtime module will call the JobStatusHook#onFinished method, and we do not need to do any additional operations.

Hook design

Definition of JobStatusHook

...

Register JobStatusHook

...

    /** Registers the JobStatusHook. */
void setJobStatusHook(JobStatusHook hook) {
...
}
}

Supported Job Mode

Support both streaming and batch mode.

The execution flow of streaming and batch is similar, the main differences are in atomicity and data visibility

Streaming

Since streaming job are long-running, usually data is to be consumed downstream in real time. Determined by the specific Sink implementation.

  • Data is visible after Checkpoint is success or visible immediately after writing.
  • In stream semantics, the data is as continuous as possible, strict atomicity is not guaranteed. Therefore, when the job fails, there is a high probability that the sink does not need to drop the table.

Batch

The batch job will end with disabled checkpoint, so we want the data to be visible after the job is success, and drop the table if the job fails.

Some external storage systems cannot be supported, such as Redis.

We will refer to spark DataSource v1 implementation.

  • Provides atomic capabilities, if the job fails, drop the table.(Requires runtime module support, when the job finally fails, notify the sink to clean up. )
  • Data visibility depends on the specific external storage, and can be divided into write-visible, final visibility, and incremental visibility. (Described in the Data Visibility section)

Drop the table if the job fails requires some additional support(both Streaming and Batch):

  • TableSink needs to provide the CleanUp API, developers implement as needed. Do nothing by default. If an exception occurs, can use this API to drop table or delete the temporary directory, etc.

Precautions

when need drop table:

  1. User manually cancel the job.
  2. Job final FAILED status, such as after exceeds the maximum number of task Failovers.

Drop table and TableSink are strongly bound:

Do not do drop table operations in the framework, drop table is implemented in TableSink according to the needs of specific TableSink, because the operations of different sinks is different.

For example, in HiveTableSink, we need to delete the temporary directory and drop the metadata in the Metastore, but FileSystemTableSink only need to delete the temporary directory,

it is also possible that no operations is required.

Atomicity & Data Visibility

Atomicity

CTAS does not provide strict atomicity, we will create the table first, the final atomicity is determined by the cleanUp implementation of TableSink.

This requires runtime module support, like the description in the Execution Flow.

Data Visibility

Regarding data visibility, it is determined by the TableSink and runtime-mode:

Stream mode:

If the external storage system supports transactions or two-phase commit, then data visibility is related to the Checkpoint cycle,

otherwise, data is visible immediately after writing, which is consistent with the current flink behavior.

Batch mode:
  • FileSystem Sink: Data should be written to the temporary directory first, visible after the final job is successful(final visibility).
  • Two-phase commit Sink:  Data visible after the final job is successful(final visibility).
  • Supports transaction Sink:  Commit transactions after the final job is successful(final visibility). Commit transactions periodically or with a fixed number of records(incremental visibility).
  • Other Sink:  Data is visible immediately after writing(write-visible).

Catalog

We can think that there are two types of catalogs in Flink, in-memory catalogs and external catalogs:

In-memory catalog:

  1. Metadata is a copy of the metadata of the external system, and the user ensures that the entity exists in the external system and the metadata is consistency, otherwise, throw exception when running. CTAS need create table first, so it is hard to ensures that the entity exists in the external system and the metadata is consistency.
  2. The user needs to configure the parameters of the external system through the with syntax, and Flink cannot obtain it through the in-memory directory.

Such as kafka table, we need the user to tell us the address of the kafka server, the name of the topic, and the data serialization format, otherwise flink job will failed.

External catalog:

  1. Metadata directly refers to external systems, and there is no consistency problem. Create table also directly calls the external system, so it is naturally guaranteed that the entity exists in the external system.
  2. The with syntax parameter is optional, Flink can obtain the necessary parameters through the external catalog.

Such as hive table, we can obtain the table information required by the Flink engine through HiveCatalog. 

Both in-memory catalog and external catalog will support CTAS, if the CTAS command is executed in the in-memory catalog and the target store does not exist in the external system, the Flink job will fail, which is consistent with the current flink behavior.

In-memory Catalog,the options of the table are completely dependent on user input.

Managed Table

For Managed Table, please refer to FLIP-188 . Table options that do not contain the ‘connector’ key and value represent a managed table. CTAS also follows this principle.

For details, please refer to the Table Store docs: https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/create-table/

CTAS supports Managed Table and Non-Managed Table.

Users need to be clear about their business needs and set the table options correctly.

Public API Changes

Syntax

I suggest introducing a CTAS clause with a following syntax:

Code Block
languagesql
titlesyntax
CREATE TABLE [ IF NOT EXISTS ] table_name 
[ WITH ( table_properties ) ]
[ AS query_expression ]

Example:

Code Block
languagesql
titlesyntax
CREATE TABLE ctas_hudi
 WITH ('connector.type' = 'hudi')
 AS SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;

Resulting table equivalent to:

Code Block
languagesql
titlesyntax
CREATE TABLE ctas_hudi
 (
 	id BIGINT,
 	name STRING,
 	age INT
 )
 WITH ('connector.type' = 'hudi');

INSERT INTO ctas_hudi SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;

Table Environment

Providing method that are used to execute CTAS for Table API user.

...

@PublicEvolving
public interface TableEnvironment {
    /**
* Registers the given {@link Table}'s result as a catalog table with {@link TableDescriptor}'s options.
*
* <p> CTAS for Table API.
*
* <p>Examples:
*
* <pre>{@code
* Map<String, String> options = new HashMap<String, String>();
* options.put("connector.type", "hudi");
* tEnv.createTable("MyTable", options, tEnv.sqlQuery("select id, name from user_table"));
* }</pre>
*
* @param path The path under which the table will be registered. See also the {@link
* TableEnvironment} class description for the format of the path.
* @param options Table options.
* @param query The {@link Table} object describing the pipeline for further transformations.
*/
void createTable(String path, Map<String, String> options, Table query);
}

Catalog

Providing method that are used to create stage table.

...

Syntax

I suggest introducing a CTAS clause with a following syntax:

Code Block
languagesql
titlesyntax
CREATE TABLE [ IF NOT EXISTS ] table_name 
[ WITH ( table_properties ) ]
[ AS query_expression ]


Example:

Code Block
languagesql
titlesyntax
CREATE TABLE ctas_hudi
 WITH ('connector.type' = 'hudi')
 AS SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;


Resulting table equivalent to:

Code Block
languagesql
titlesyntax
CREATE TABLE ctas_hudi
 (
 	id BIGINT,
 	name STRING,
 	age INT
 )
 WITH ('connector.type' = 'hudi');

INSERT INTO ctas_hudi SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;

Table Environment

Providing method that are used to execute CTAS for Table API user.

@PublicEvolving
public interface TableEnvironment {
    /**
* Registers the given {@link Table}'s result as a catalog table with {@link TableDescriptor}'s options.
*
* <p> CTAS for Table API.
*
* <p>Examples:
*
* <pre>{@code
* Map<String, String> options = new HashMap<String, String>();
* options.put("connector.type", "hudi");
* tEnv.createTable("MyTable", options, tEnv.sqlQuery("select id, name from user_table"));
* }</pre>
*
* @param path The path under which the table will be registered. See also the {@link
* TableEnvironment} class description for the format of the path.
* @param options Table options.
* @param query The {@link Table} object describing the pipeline for further transformations.
*/
void createTable(String path, Map<String, String> options, Table query);
}

Catalog

Providing method that are used to create stage table.

@PublicEvolving
public interface Catalog {

    /**
* Creates a new stage table for CTAS.
* This table will be recorded in the Catalog's memory
* and will be created when the Job status changes to Created;
* will be dropped when the job status changes to FAILED or CANCELED.
*
* <p>The framework will make sure to call this method with fully validated {@link
* ResolvedCatalogTable}. Those instances are easy to serialize
* for a durable catalog implementation.
*
* @param tablePath path of the table to be created
* @param table the table definition
* @param ignoreIfExists flag to specify behavior when a table already exists at the
* given path: if set to false, it throws a TableAlreadyExistException, if set to true, do
* nothing.
* @throws TableAlreadyExistException if table already exists and ignoreIfExists is false
* @throws DatabaseNotExistException if the database in tablePath doesn't exist
* @throws CatalogException in case of any runtime exception
*/
void createStageTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException;
}


Proposed Changes

First of all, make it clear, CTAS command create table must go through catalog.

Implementation Plan

We will introduce new concepts: Stage Table.

Stage Table: Created through the Catalog#createStageTable API, stored in the memory of the Catalog, visible in the SQL compilation stage;

There will be a collection record stage table in the catalog, which is created in the catalog backend storage when the job status is CREATED;

Dropped from the catalog backend storage when the job status is FAILED or CANCELED.


Through the research summary and analysis, the overall implementation process is as follows:

Execution Flow

Image Added

The overall execution process is shown in the figure above.


Detailed execution process

Steps:

  1. Create sink table(stage table) through Catalog's new API createStageTable.
  2. Construct CTASJobStatusHook with Catalog as a construction parameter, CTASJobStatusHook is an implementation of the JobStatusHook interface.
  3. Register CTASJobStatusHook with StreamGraph, then passed to JobGraph and serialized(Need to implement serialization/deserialization of Catalog and JobStatusHook).
  4. When the job starts and the status is CREATED, the runtime module will call the JobStatusHook#onCreated method, and we call the Catalog#createTable method in the CTASJobStatusHook#onCreated method.
  5. When the final status of the job is FAILED, the runtime module will call the JobStatusHook#onFailed method, we call the Catalog#dropTable method in the CTASJobStatusHook#onFailed method.
  6. When the final status of the job is CANCELED, the runtime module will call the JobStatusHook#onCanceled method, we call the Catalog#dropTable method in the CTASJobStatusHook#onCanceled method.
  7. When the final status of the job is FINISH, the runtime module will call the JobStatusHook#onFinished method, and we do not need to do any additional operations.

Hook design

Definition of JobStatusHook

/**
* Hooks provided by users on job status changing.
*/
public interface JobStatusHook {

/** When Job become CREATED status. It would only be called one time. */
default void onCreated(JobID jobId) {}

/** When job finished successfully. */
default void onFinished(JobID jobId) {}

/** When job failed finally. */
default void onFailed(JobID jobId, Throwable throwable) {}

/** When job get canceled by users. */
default void onCanceled(JobID jobId) {}
}

Register JobStatusHook

public class StreamGraph implements Pipeline {

... ...

    /** Registers the JobStatusHook. */
void setJobStatusHook(JobStatusHook hook) {
...
}
}


Supported Job Mode

Support both streaming and batch mode.

The execution flow of streaming and batch is similar, the main differences are in atomicity and data visibility

Streaming

Since streaming job are long-running, usually data is to be consumed downstream in real time. Determined by the specific Sink implementation.

  • Data is visible after Checkpoint is success or visible immediately after writing.
  • In stream semantics, the data is as continuous as possible, strict atomicity is not guaranteed. Therefore, when the job fails, there is a high probability that the sink does not need to drop the table.

Batch

The batch job will end with disabled checkpoint, so we want the data to be visible after the job is success, and drop the table if the job fails.

Some external storage systems cannot be supported, such as Redis.

We will refer to spark DataSource v1 implementation.

  • Provides atomic capabilities, if the job fails, drop the table.(Requires runtime module support, when the job finally fails, notify the sink to clean up. )
  • Data visibility depends on the specific external storage, and can be divided into write-visible, final visibility, and incremental visibility. (Described in the Data Visibility section)

Drop the table if the job fails requires some additional support(both Streaming and Batch):

  • TableSink needs to provide the CleanUp API, developers implement as needed. Do nothing by default. If an exception occurs, can use this API to drop table or delete the temporary directory, etc.

Precautions

when need drop table:

  1. User manually cancel the job.
  2. Job final FAILED status, such as after exceeds the maximum number of task Failovers.

Drop table and TableSink are strongly bound:

Do not do drop table operations in the framework, drop table is implemented in TableSink according to the needs of specific TableSink, because the operations of different sinks is different.

For example, in HiveTableSink, we need to delete the temporary directory and drop the metadata in the Metastore, but FileSystemTableSink only need to delete the temporary directory,

it is also possible that no operations is required.

Atomicity & Data Visibility

Atomicity

CTAS does not provide strict atomicity, we will create the table first, the final atomicity is determined by the cleanUp implementation of TableSink.

This requires runtime module support, like the description in the Execution Flow.

Data Visibility

Regarding data visibility, it is determined by the TableSink and runtime-mode:

Stream mode:

If the external storage system supports transactions or two-phase commit, then data visibility is related to the Checkpoint cycle,

otherwise, data is visible immediately after writing, which is consistent with the current flink behavior.

Batch mode:
  • FileSystem Sink: Data should be written to the temporary directory first, visible after the final job is successful(final visibility).
  • Two-phase commit Sink:  Data visible after the final job is successful(final visibility).
  • Supports transaction Sink:  Commit transactions after the final job is successful(final visibility). Commit transactions periodically or with a fixed number of records(incremental visibility).
  • Other Sink:  Data is visible immediately after writing(write-visible).

Catalog

We can think that there are two types of catalogs in Flink, in-memory catalogs and external catalogs:

In-memory catalog:

  1. Metadata is a copy of the metadata of the external system, and the user ensures that the entity exists in the external system and the metadata is consistency, otherwise, throw exception when running. CTAS need create table first, so it is hard to ensures that the entity exists in the external system and the metadata is consistency.
  2. The user needs to configure the parameters of the external system through the with syntax, and Flink cannot obtain it through the in-memory directory.

Such as kafka table, we need the user to tell us the address of the kafka server, the name of the topic, and the data serialization format, otherwise flink job will failed.

External catalog:

  1. Metadata directly refers to external systems, and there is no consistency problem. Create table also directly calls the external system, so it is naturally guaranteed that the entity exists in the external system.
  2. The with syntax parameter is optional, Flink can obtain the necessary parameters through the external catalog.

Such as hive table, we can obtain the table information required by the Flink engine through HiveCatalog. 


Both in-memory catalog and external catalog will support CTAS, if the CTAS command is executed in the in-memory catalog and the target store does not exist in the external system, the Flink job will fail, which is consistent with the current flink behavior.

In-memory Catalog,the options of the table are completely dependent on user input.

Managed Table

For Managed Table, please refer to FLIP-188 . Table options that do not contain the ‘connector’ key and value represent a managed table. CTAS also follows this principle.

For details, please refer to the Table Store docs: https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/create-table/

CTAS supports Managed Table and Non-Managed Table.

Users need to be clear about their business needs and set the table options correctly.


Compatibility, Deprecation, and Migration Plan

...

  • Streaming mode requires the table to be created firstfirst(metadata sharing), downstream jobs can consume in real time.
  • In most cases, Streaming jobs do not need to be cleaned up even if the job fails.Flink has a rich connector ecosystem, and the capabilities provided by external storage systems are different, Flink needs to behave consistently(Such as Redis, cannot be cleaned unless all keys written are recorded).
  • Batch jobs try to ensure final atomicity(The job is successful and the data is visible; otherwise, drop the metadata and delete the temporary data).