...
@PublicEvolving /** |
CreatingTable()
Add CreatingTable to use CTAS for table API.
/** A table that need to be created for CTAS. */ |
Catalog
Providing method that are used to serialize/deserialize Catalog and infer the option of CatalogBaseTable.
The CreatingTable interface is newly introduced because if we add the create/createIfNotExist API to the Table interface,
the user must call the saveAs API before calling these API, which will cause additional usage costs to the user.
The recommended way to use is:
public interface Catalog {
/**
* When the CTAS function was used, the Catalog combined {@link Catalog#supportsManagedTable}
TablePipeline tablePipeline = table.saveAs("my_ctas_table") |
|
|
.option("path", "/tmp/my_ctas_table/") |
We save the properties set through the option API and set them in the CatalogBaseTable when executing the create/createIfNotExist API, so as to generate the TableSink.
Catalog
Providing method that are used to serialize/deserialize Catalog and infer the option of CatalogBaseTable.
... ...
/** Registers the JobStatusHook. */
void addJobStatusHook(JobStatusHook hook) {
...
@PublicEvolving /** |
Proposed Changes
First of all, make it clear, CTAS command create table must go through catalog.
Implementation Plan
Through the research summary and analysis, the current status of CTAS in the field of big data is:
- Flink: Flink dialect does not support CTAS. ==> LEVEL-1
- Spark DataSource v1: is atomic (can roll back), but is not isolated. ==> LEVEL-2
- Spark DataSource v2: Guaranteed atomicity and isolation. ==> LEVEL-3
- Hive MR: Guaranteed atomicity and isolation. ==> LEVEL-3
Combining the current situation of Flink and the needs of our business, choosing a Level-2 implementation for Flink.
Planner
Providing method for planner to register JobStatusHook with StreamGraph.
inferTableOptions(ObjectPath tablePath, CatalogBaseTable table) {} /** |
|
The client side needs to implement the serialization function of Catalog and CatalogBaseTable.
Catalog and CatalogBaseTable related objects do not implement the Serializable interface, and most of them do not have parameterless constructors, so the serialization implementation is complex.
A simple way to serialize the catalog is to save the options of the catalog in the CatalogManager, so that the JM side only needs to use these options to re-initialize the catalog.
But this solution is only applicable to the way of create catalog using DDL, we can get the Catalog options through the with keyword.
If we use the Catalog registered in TableEnvironment#registerCatalog, we cannot get options.
There are a large number of users who use TableEnvironment#registerCatalog to register the Catalog in the production environment.
Catalog#inferTableOptions is convenient for users to customize the Catalog, and when it supports the CTAS function,
the options of the table can be automatically inferred to avoid job failure due to lack of information.
The serialize/deserialize API helps realize the serialization and deserialization of the Catalog,
and the user can decide what information needs to be saved during serialization, so that it can be used during deserialization.
Proposed Changes
The overall execution process is shown in the figure above.
Because the client process may exit soon, such as detached mode, choose to create/drop the table on the JM side, and create table and drop table are executed through Catalog.
Therefore, a new Hook mechanism and Catalog serialization and deserialization solution need to be introduced.
The overall execution process is as follows,
- Flink Client compiles SQL and generates an execution plan, In this process, the Hook that needs to be executed on the JM side is generated, and the Hook, Catalog and CatalogBaseTable are serialized.
- Submit the job to the cluster, if it is in detached mode, the client can exit.
- When the job starts, deserialize hooks, Catalog and CatalogBaseTable; Call the Catalog#createTable method through the hook to create the CatalogBaseTable.
- start task execution.
- If the final status of the job is failed or canceled, the created CatalogBaseTable needs to be dropped by calling the hook of the Catalog#dropTable method.
For details, see Implementation Plan.
Implementation Plan
Through the appendix research summary and analysis, the current status of CTAS in the field of big data is:
- Flink: Flink dialect does not support CTAS. ==> LEVEL-1
- Spark DataSource v1: is atomic (can roll back), but is not isolated. ==> LEVEL-2
- Spark DataSource v2: Guaranteed atomicity and isolation. ==> LEVEL-3
- Hive MR: Guaranteed atomicity and isolation. ==> LEVEL-3
Combining the current situation of Flink and the needs of our business, choosing a Level-2 implementation for Flink.
Planner
Providing method for planner to register JobStatusHook with StreamGraph.
public class StreamGraph implements Pipeline { /** Registers the JobStatusHook. */ |
The client side needs to implement the serialization function of Catalog and CatalogBaseTable.
Catalog and CatalogBaseTable related objects do not implement the Serializable interface, and most of them do not have parameterless constructors, so the serialization implementation is complexSo this solution cannot cover the whole scene.
Runtime
Provide JM side, job status change hook mechanism.
/** |
On the JM side, deserialize the JobStatusHook,
...
We can integrate create table in the onCreated method, Integrate drop table in onFailed and onCanceled methods.
If the job is configured with RestartStrategy, be aware that these methods will be called multiple times.
Execution Flow
The overall execution process is shown in the figure above.
Because the client process may exit soon, such as detached mode, choose to create/drop the table on the JM side.
Integrate drop table in onFailed and onCanceled methods.
If the job is configured with RestartStrategy, be aware that these methods will be called multiple times.
Execution Flow
Detailed execution process
...
When the final status of the job is FINISH, the runtime module will call the JobStatusHook#onFinished method, and we do not need to do any additional operations.
Supported Job Mode
Support both streaming and batch mode.
The execution flow of streaming and batch is similar, the main differences are in atomicity and data visibility
Streaming
Since streaming job are long-running, usually data is to be consumed downstream in real time. Determined by the specific Sink implementation.
- Data is visible after Checkpoint is success or visible immediately after writing.
- In stream semantics, the data is as continuous as possible, strict atomicity is not guaranteed. Therefore, when the job fails, there is a high probability that the sink does not need to drop the table.
Batch
The batch job will end with disabled checkpoint, so we want the data to be visible after the job is success, and drop the table if the job fails.
Some external storage systems cannot be supported, such as Redis.
We will refer to spark DataSource v1 implementation.
- Provides atomic capabilities, if the job fails, drop the table.(Requires runtime module support, when the job finally fails, notify the sink to clean up. )
- Data visibility depends on the specific external storage, and can be divided into write-visible, final visibility, and incremental visibility. (Described in the Data Visibility section)
Drop the table if the job fails requires some additional support(both Streaming and Batch):
- TableSink needs to provide the CleanUp API, developers implement as needed. Do nothing by default. If an exception occurs, can use this API to drop table or delete the temporary directory, etc.
Precautions
when need drop table:
- User manually cancel the job.
- Job final FAILED status, such as after exceeds the maximum number of task Failovers.
Drop table and TableSink are strongly bound:
Do not do drop table operations in the framework, drop table is implemented in TableSink according to the needs of specific TableSink, because the operations of different sinks is different.
For example, in HiveTableSink, we need to delete the temporary directory and drop the metadata in the Metastore, but FileSystemTableSink only need to delete the temporary directory,
it is also possible that no operations is required.
Atomicity & Data Visibility
Atomicity
CTAS does not provide strict atomicity, we will create the table first, the final atomicity is determined by the cleanUp implementation of TableSink.
This requires runtime module support, like the description in the Execution Flow.
Data Visibility
Regarding data visibility, it is determined by the TableSink and runtime-mode:
Stream mode:
If the external storage system supports transactions or two-phase commit, then data visibility is related to the Checkpoint cycle,
otherwise, data is visible immediately after writing, which is consistent with the current flink behavior.
Batch mode:
- FileSystem Sink: Data should be written to the temporary directory first, visible after the final job is successful(final visibility).
- Two-phase commit Sink: Data visible after the final job is successful(final visibility).
- Supports transaction Sink: Commit transactions after the final job is successful(final visibility). Commit transactions periodically or with a fixed number of records(incremental visibility).
- Other Sink: Data is visible immediately after writing(write-visible).
Catalog
We can think that there are two types of catalogs in Flink, in-memory catalogs and external catalogs:
In-memory catalog:
- Metadata is a copy of the metadata of the external system, and the user ensures that the entity exists in the external system and the metadata is consistency, otherwise, throw exception when running. CTAS need create table first, so it is hard to ensures that the entity exists in the external system and the metadata is consistency.
- The user needs to configure the parameters of the external system through the with syntax, and Flink cannot obtain it through the in-memory directory.
Such as kafka table, we need the user to tell us the address of the kafka server, the name of the topic, and the data serialization format, otherwise flink job will failed.
External catalog:
- Metadata directly refers to external systems, and there is no consistency problem. Create table also directly calls the external system, so it is naturally guaranteed that the entity exists in the external system.
- The with syntax parameter is optional, Flink can obtain the necessary parameters through the external catalog.
Such as hive table, we can obtain the table information required by the Flink engine through HiveCatalog.
Both in-memory catalog and external catalog will support CTAS, if the CTAS command is executed in the in-memory catalog and the target store does not exist in the external system, the Flink job will fail, which is consistent with the current flink behavior.
In-memory Catalog,the options of the table are completely dependent on user input.
Managed Table
For Managed Table, please refer to FLIP-188 . Table options that do not contain the ‘connector’ key and value represent a managed table. CTAS also follows this principle.
For details, please refer to the Table Store docs: https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/create-table/
CTAS supports Managed Table and Non-Managed Table.
Users need to be clear about their business needs and set the table options correctly.
Compatibility, Deprecation, and Migration Plan
It is a new feature with no implication for backwards compatibility.
Test Plan
changes will be verified by UT
Rejected Alternatives
Precautions
when need drop table:
- User manually cancel the job.
- Job final FAILED status, such as after exceeds the maximum number of task Failovers.
Drop table and TableSink are strongly bound:
Do not do drop table operations in the framework, drop table is implemented in TableSink according to the needs of specific TableSink, because the operations of different sinks is different.
For example, in HiveTableSink, we need to delete the temporary directory and drop the metadata in the Metastore, but FileSystemTableSink only need to delete the temporary directory,
it is also possible that no operations is required.
Atomicity & Data Visibility
Atomicity
CTAS does not provide strict atomicity, we will create the table first, the final atomicity is determined by the cleanUp implementation of TableSink.
This requires runtime module support, like the description in the Execution Flow.
Data Visibility
Regarding data visibility, it is determined by the TableSink and runtime-mode:
Stream mode:
If the external storage system supports transactions or two-phase commit, then data visibility is related to the Checkpoint cycle,
otherwise, data is visible immediately after writing, which is consistent with the current flink behavior.
Batch mode:
- FileSystem Sink: Data should be written to the temporary directory first, visible after the final job is successful(final visibility).
- Two-phase commit Sink: Data visible after the final job is successful(final visibility).
- Supports transaction Sink: Commit transactions after the final job is successful(final visibility). Commit transactions periodically or with a fixed number of records(incremental visibility).
- Other Sink: Data is visible immediately after writing(write-visible).
Catalog
We can think that there are two types of catalogs in Flink, in-memory catalogs and external catalogs:
In-memory catalog:
- Metadata is a copy of the metadata of the external system, and the user ensures that the entity exists in the external system and the metadata is consistency, otherwise, throw exception when running. CTAS need create table first, so it is hard to ensures that the entity exists in the external system and the metadata is consistency.
- The user needs to configure the parameters of the external system through the with syntax, and Flink cannot obtain it through the in-memory directory.
Such as kafka table, we need the user to tell us the address of the kafka server, the name of the topic, and the data serialization format, otherwise flink job will failed.
External catalog:
- Metadata directly refers to external systems, and there is no consistency problem. Create table also directly calls the external system, so it is naturally guaranteed that the entity exists in the external system.
- The with syntax parameter is optional, Flink can obtain the necessary parameters through the external catalog.
Such as hive table, we can obtain the table information required by the Flink engine through HiveCatalog.
Both in-memory catalog and external catalog will support CTAS, if the CTAS command is executed in the in-memory catalog and the target store does not exist in the external system, the Flink job will fail, which is consistent with the current flink behavior.
In-memory Catalog,the options of the table are completely dependent on user input.
Managed Table
For Managed Table, please refer to FLIP-188 . Table options that do not contain the ‘connector’ key and value represent a managed table. CTAS also follows this principle.
For details, please refer to the Table Store docs: https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/create-table/
CTAS supports Managed Table and Non-Managed Table.
Users need to be clear about their business needs and set the table options correctly.
Compatibility, Deprecation, and Migration Plan
It is a new feature with no implication for backwards compatibility.
Test Plan
changes will be verified by UT
Rejected Alternatives
Catalog serialize
A simple way to serialize the catalog is to save the options of the catalog in the CatalogManager, so that the JM side only needs to use these options to re-initialize the catalog.
The advantage of this solution is that the implementation is simple and convenient, and does not require complex serialization and deserialization tools.
The disadvantage of this solution is that it cannot cover the entire scene.
Because this solution is only applicable to the way of create catalog using DDL, we can get the Catalog options through the with keyword.
If we use the Catalog registered in TableEnvironment#registerCatalog, we cannot get options.
There are a large number of users who use TableEnvironment#registerCatalog to register the Catalog in the production environment.N/A
References
- Support SELECT clause in CREATE TABLE(CTAS)
- MySQL CTAS syntax
- Microsoft Azure Synapse CTAS
- LanguageManual DDL#Create/Drop/ReloadFunction
- Spark Create Table Syntax
...