Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

@PublicEvolving
public interface Table extends Explainable<Table>, Executable {

    /**
* Declare the pipeline defined by the given {@link Table} object
* to create the table at the specified path.
*/
CreatingTable saveAs(String tablePath);
}

CreatingTable()

Add CreatingTable to use CTAS for table API.

/** A table that need to be created for CTAS. */
@PublicEvolving
public interface CreatingTable {

/**
* add table option.
* such as option("connector", "filesystem");
*/
CreatingTable option(String key, String value);

/**
* Create the table under the specified path
* and write the pipeline data to this table.
*/
TablePipeline create();

/**
* Create the table under the specified path if not exist
* and write the pipeline data to this table.
*/
TablePipeline createIfNotExist();
}

Catalog

Providing method that are used to serialize/deserialize Catalog and infer the option of CatalogBaseTable.

The CreatingTable interface is newly introduced because if we add the create/createIfNotExist API to the Table interface,

the user must call the saveAs API before calling these API, which will cause additional usage costs to the user.

The recommended way to use is:

@PublicEvolving
public interface Catalog {
/**
* When the CTAS function was used, the Catalog combined {@link Catalog#supportsManagedTable}
TablePipeline tablePipeline = table.saveAs("my_ctas_table")
.option("connector", "filesystem")
.option("format", "testcsv")

*
 
to
 
infer
  .option("path", "/tmp/my_ctas_table/")
.create();
tablePipeline.execute();

We save the properties set through the option API and set them in the CatalogBaseTable when executing the create/createIfNotExist API, so as to generate the TableSink.

Catalog

Providing method that are used to serialize/deserialize Catalog and infer the option of CatalogBaseTable.

public class StreamGraph implements Pipeline {
... ...
/** Registers the JobStatusHook. */
void addJobStatusHook(JobStatusHook hook) {
...
@PublicEvolving
public interface Catalog {

   /**
* When the CTAS function was used, the Catalog combined {@link Catalog#supportsManagedTable}whether to add some options to the {@link CatalogBaseTable}.
*
* For example:
* {@link JdbcCatalog} can add user, password, url and other options to {@link CatalogBaseTable}.
*/
default void inferTableOptions(CatalogBaseTable table) {}
/* to infer whether to add some options to the {@link CatalogBaseTable}.
*
* AssistFor inexample:
the serialization of* different catalogs.
* Catalog decides for itself which information needs to be serialized.{
@link JdbcCatalog} can add connector = 'jdbc', user, password, url and other options to {@link CatalogBaseTable},
*/
{
@link HiveCatalog} can defaultadd voidconnector serialize(OutputStream output) throws IOException {}
= 'hive' to {@link CatalogBaseTable}, help generate TableSink. /**
* The Assisttables in deserialization of different catalogs. {@link GenericInMemoryCatalog} already exist externally,
* Determineoptions whethermust tobe createfilled ain newmanually Catalogby instancethe baseduser, on
and the Catalog cannot *be the deserialized data and the implementation of the catalogautomatically inferred.
*/
default Catalogvoid deserialize(InputStream input) throws ClassNotFoundException, IOException {
return this;
}
}

Proposed Changes

First of all, make it clear, CTAS command create table must go through catalog.

Implementation Plan

Through the research summary and analysis, the current status of CTAS in the field of big data is:

  • Flink: Flink dialect does not support CTAS. ==> LEVEL-1
  • Spark DataSource v1: is atomic (can roll back), but is not isolated. ==> LEVEL-2
  • Spark DataSource v2: Guaranteed atomicity and isolation. ==> LEVEL-3
  • Hive MR: Guaranteed atomicity and isolation. ==> LEVEL-3

Combining the current situation of Flink and the needs of our business, choosing a Level-2 implementation for Flink.

Planner

Providing method for planner to register JobStatusHook with StreamGraph.

inferTableOptions(ObjectPath tablePath, CatalogBaseTable table) {}

   /**
* Assist in the serialization of different catalogs.
* Catalog decides for itself which information needs to be serialized.
*/
default void serialize(OutputStream output) throws IOException {}

/**
* Assist in deserialization of different catalogs.
* Determine whether to create a new Catalog instance based on
* the deserialized data and the implementation of the catalog.
*/
default Catalog deserialize(InputStream input) throws ClassNotFoundException, IOException {
return this;

}
}

The client side needs to implement the serialization function of Catalog and CatalogBaseTable.

Catalog and CatalogBaseTable related objects do not implement the Serializable interface, and most of them do not have parameterless constructors, so the serialization implementation is complex.

A simple way to serialize the catalog is to save the options of the catalog in the CatalogManager, so that the JM side only needs to use these options to re-initialize the catalog.

But this solution is only applicable to the way of create catalog using DDL, we can get the Catalog options through the with keyword.

If we use the Catalog registered in TableEnvironment#registerCatalog, we cannot get options.

There are a large number of users who use TableEnvironment#registerCatalog to register the Catalog in the production environment.

Catalog#inferTableOptions is convenient for users to customize the Catalog, and when it supports the CTAS function,

the options of the table can be automatically inferred to avoid job failure due to lack of information.

The serialize/deserialize API helps realize the serialization and deserialization of the Catalog,

and the user can decide what information needs to be saved during serialization, so that it can be used during deserialization.

Proposed Changes

Image Added

The overall execution process is shown in the figure above.

Because the client process may exit soon, such as detached mode, choose to create/drop the table on the JM side, and create table and drop table are executed through Catalog.

Therefore, a new Hook mechanism and Catalog serialization and deserialization solution need to be introduced.

The overall execution process is as follows,

  1. Flink Client compiles SQL and generates an execution plan, In this process, the Hook that needs to be executed on the JM side is generated, and the Hook, Catalog and CatalogBaseTable are serialized.
  2. Submit the job to the cluster, if it is in detached mode, the client can exit.
  3. When the job starts, deserialize hooks, Catalog and CatalogBaseTable; Call the Catalog#createTable method through the hook to create the CatalogBaseTable.
  4. start task execution.
  5. If the final status of the job is failed or canceled, the created CatalogBaseTable needs to be dropped by calling the hook of the Catalog#dropTable method.

For details, see Implementation Plan.

Implementation Plan

Through the appendix research summary and analysis, the current status of CTAS in the field of big data is:

  • Flink: Flink dialect does not support CTAS. ==> LEVEL-1
  • Spark DataSource v1: is atomic (can roll back), but is not isolated. ==> LEVEL-2
  • Spark DataSource v2: Guaranteed atomicity and isolation. ==> LEVEL-3
  • Hive MR: Guaranteed atomicity and isolation. ==> LEVEL-3

Combining the current situation of Flink and the needs of our business, choosing a Level-2 implementation for Flink.

Planner

Providing method for planner to register JobStatusHook with StreamGraph.

public class StreamGraph implements Pipeline {

... ...

    /** Registers the JobStatusHook. */
void addJobStatusHook(JobStatusHook hook) {
...
}
}

The client side needs to implement the serialization function of Catalog and CatalogBaseTable.

Catalog and CatalogBaseTable related objects do not implement the Serializable interface, and most of them do not have parameterless constructors, so the serialization implementation is complexSo this solution cannot cover the whole scene.


Runtime

Provide JM side, job status change hook mechanism.

/**
* Hooks provided by users on job status changing.
*/
public interface JobStatusHook {

/** When Job become CREATED status. It would only be called one time. */
default void onCreated(JobID jobId) {}

/** When job finished successfully. */
default void onFinished(JobID jobId) {}

/** When job failed finally. */
default void onFailed(JobID jobId, Throwable throwable) {}

/** When job get canceled by users. */
default void onCanceled(JobID jobId) {}
}

On the JM side, deserialize the JobStatusHook,

...

We can integrate create table in the onCreated method, Integrate drop table in onFailed and onCanceled methods.

If the job is configured with RestartStrategy, be aware that these methods will be called multiple times.

Execution Flow

Image Removed

The overall execution process is shown in the figure above.

Because the client process may exit soon, such as detached mode, choose to create/drop the table on the JM side.

Integrate drop table in onFailed and onCanceled methods.

If the job is configured with RestartStrategy, be aware that these methods will be called multiple times.

Execution Flow


Detailed execution process

...

When the final status of the job is FINISH, the runtime module will call the JobStatusHook#onFinished method, and we do not need to do any additional operations.

Supported Job Mode

Support both streaming and batch mode.

The execution flow of streaming and batch is similar, the main differences are in atomicity and data visibility

Streaming

Since streaming job are long-running, usually data is to be consumed downstream in real time. Determined by the specific Sink implementation.

  • Data is visible after Checkpoint is success or visible immediately after writing.
  • In stream semantics, the data is as continuous as possible, strict atomicity is not guaranteed. Therefore, when the job fails, there is a high probability that the sink does not need to drop the table.

Batch

The batch job will end with disabled checkpoint, so we want the data to be visible after the job is success, and drop the table if the job fails.

Some external storage systems cannot be supported, such as Redis.

We will refer to spark DataSource v1 implementation.

  • Provides atomic capabilities, if the job fails, drop the table.(Requires runtime module support, when the job finally fails, notify the sink to clean up. )
  • Data visibility depends on the specific external storage, and can be divided into write-visible, final visibility, and incremental visibility. (Described in the Data Visibility section)

Drop the table if the job fails requires some additional support(both Streaming and Batch):

  • TableSink needs to provide the CleanUp API, developers implement as needed. Do nothing by default. If an exception occurs, can use this API to drop table or delete the temporary directory, etc.

Precautions

when need drop table:

  1. User manually cancel the job.
  2. Job final FAILED status, such as after exceeds the maximum number of task Failovers.

Drop table and TableSink are strongly bound:

Do not do drop table operations in the framework, drop table is implemented in TableSink according to the needs of specific TableSink, because the operations of different sinks is different.

For example, in HiveTableSink, we need to delete the temporary directory and drop the metadata in the Metastore, but FileSystemTableSink only need to delete the temporary directory,

it is also possible that no operations is required.

Atomicity & Data Visibility

Atomicity

CTAS does not provide strict atomicity, we will create the table first, the final atomicity is determined by the cleanUp implementation of TableSink.

This requires runtime module support, like the description in the Execution Flow.

Data Visibility

Regarding data visibility, it is determined by the TableSink and runtime-mode:

Stream mode:

If the external storage system supports transactions or two-phase commit, then data visibility is related to the Checkpoint cycle,

otherwise, data is visible immediately after writing, which is consistent with the current flink behavior.

Batch mode:
  • FileSystem Sink: Data should be written to the temporary directory first, visible after the final job is successful(final visibility).
  • Two-phase commit Sink:  Data visible after the final job is successful(final visibility).
  • Supports transaction Sink:  Commit transactions after the final job is successful(final visibility). Commit transactions periodically or with a fixed number of records(incremental visibility).
  • Other Sink:  Data is visible immediately after writing(write-visible).

Catalog

We can think that there are two types of catalogs in Flink, in-memory catalogs and external catalogs:

In-memory catalog:

  1. Metadata is a copy of the metadata of the external system, and the user ensures that the entity exists in the external system and the metadata is consistency, otherwise, throw exception when running. CTAS need create table first, so it is hard to ensures that the entity exists in the external system and the metadata is consistency.
  2. The user needs to configure the parameters of the external system through the with syntax, and Flink cannot obtain it through the in-memory directory.

Such as kafka table, we need the user to tell us the address of the kafka server, the name of the topic, and the data serialization format, otherwise flink job will failed.

External catalog:

  1. Metadata directly refers to external systems, and there is no consistency problem. Create table also directly calls the external system, so it is naturally guaranteed that the entity exists in the external system.
  2. The with syntax parameter is optional, Flink can obtain the necessary parameters through the external catalog.

Such as hive table, we can obtain the table information required by the Flink engine through HiveCatalog. 

Both in-memory catalog and external catalog will support CTAS, if the CTAS command is executed in the in-memory catalog and the target store does not exist in the external system, the Flink job will fail, which is consistent with the current flink behavior.

In-memory Catalog,the options of the table are completely dependent on user input.

Managed Table

For Managed Table, please refer to FLIP-188 . Table options that do not contain the ‘connector’ key and value represent a managed table. CTAS also follows this principle.

For details, please refer to the Table Store docs: https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/create-table/

CTAS supports Managed Table and Non-Managed Table.

Users need to be clear about their business needs and set the table options correctly.

Compatibility, Deprecation, and Migration Plan

It is a new feature with no implication for backwards compatibility.

Test Plan

changes will be verified by UT

Rejected Alternatives

Precautions

when need drop table:

  1. User manually cancel the job.
  2. Job final FAILED status, such as after exceeds the maximum number of task Failovers.

Drop table and TableSink are strongly bound:

Do not do drop table operations in the framework, drop table is implemented in TableSink according to the needs of specific TableSink, because the operations of different sinks is different.

For example, in HiveTableSink, we need to delete the temporary directory and drop the metadata in the Metastore, but FileSystemTableSink only need to delete the temporary directory,

it is also possible that no operations is required.

Atomicity & Data Visibility

Atomicity

CTAS does not provide strict atomicity, we will create the table first, the final atomicity is determined by the cleanUp implementation of TableSink.

This requires runtime module support, like the description in the Execution Flow.

Data Visibility

Regarding data visibility, it is determined by the TableSink and runtime-mode:

Stream mode:

If the external storage system supports transactions or two-phase commit, then data visibility is related to the Checkpoint cycle,

otherwise, data is visible immediately after writing, which is consistent with the current flink behavior.

Batch mode:
  • FileSystem Sink: Data should be written to the temporary directory first, visible after the final job is successful(final visibility).
  • Two-phase commit Sink:  Data visible after the final job is successful(final visibility).
  • Supports transaction Sink:  Commit transactions after the final job is successful(final visibility). Commit transactions periodically or with a fixed number of records(incremental visibility).
  • Other Sink:  Data is visible immediately after writing(write-visible).

Catalog

We can think that there are two types of catalogs in Flink, in-memory catalogs and external catalogs:

In-memory catalog:

  1. Metadata is a copy of the metadata of the external system, and the user ensures that the entity exists in the external system and the metadata is consistency, otherwise, throw exception when running. CTAS need create table first, so it is hard to ensures that the entity exists in the external system and the metadata is consistency.
  2. The user needs to configure the parameters of the external system through the with syntax, and Flink cannot obtain it through the in-memory directory.

Such as kafka table, we need the user to tell us the address of the kafka server, the name of the topic, and the data serialization format, otherwise flink job will failed.

External catalog:

  1. Metadata directly refers to external systems, and there is no consistency problem. Create table also directly calls the external system, so it is naturally guaranteed that the entity exists in the external system.
  2. The with syntax parameter is optional, Flink can obtain the necessary parameters through the external catalog.

Such as hive table, we can obtain the table information required by the Flink engine through HiveCatalog. 


Both in-memory catalog and external catalog will support CTAS, if the CTAS command is executed in the in-memory catalog and the target store does not exist in the external system, the Flink job will fail, which is consistent with the current flink behavior.

In-memory Catalog,the options of the table are completely dependent on user input.

Managed Table

For Managed Table, please refer to FLIP-188 . Table options that do not contain the ‘connector’ key and value represent a managed table. CTAS also follows this principle.

For details, please refer to the Table Store docs: https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/create-table/

CTAS supports Managed Table and Non-Managed Table.

Users need to be clear about their business needs and set the table options correctly.


Compatibility, Deprecation, and Migration Plan

It is a new feature with no implication for backwards compatibility.

Test Plan

changes will be verified by UT

Rejected Alternatives

Catalog serialize

A simple way to serialize the catalog is to save the options of the catalog in the CatalogManager, so that the JM side only needs to use these options to re-initialize the catalog.

The advantage of this solution is that the implementation is simple and convenient, and does not require complex serialization and deserialization tools.

The disadvantage of this solution is that it cannot cover the entire scene.

Because this solution is only applicable to the way of create catalog using DDL, we can get the Catalog options through the with keyword.

If we use the Catalog registered in TableEnvironment#registerCatalog, we cannot get options.

There are a large number of users who use TableEnvironment#registerCatalog to register the Catalog in the production environment.N/A

References

  1. Support SELECT clause in CREATE TABLE(CTAS)
  2. MySQL CTAS syntax
  3. Microsoft Azure Synapse CTAS
  4. LanguageManual DDL#Create/Drop/ReloadFunction
  5. Spark Create Table Syntax

...