Status
Current state: ["Under Discussion"]
Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)
JIRA:
Released: 1.16
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The current syntax/features of Flink SQL is very perfect in both stream mode and batch mode.
But there are still some usability to improve.
for example, If the user wants to insert data into a new table, 2 steps are required:
First, prepare the DDL statement of the table named t1;
Second, insert the data into t1;
These two steps seem to be normal, but if there are many fields, spelling DDL statements can be difficult,
and write out these columns in the following insert statement.
Therefore, we can support CTAS (CREATE TABLE AS SELECT) like MySQL, Oracle, Microsoft SQL Server, Hive, Spark, etc ...
It will be more user friendly. In addition, the Hive dialect already has some support for CTAS.
Proposed Changes
First of all, make it clear, CTAS command create table must go through catalog.
Syntax
I suggest introducing a CTAS clause with a following syntax:
CREATE TABLE [ IF NOT EXISTS ] table_name [ WITH ( table_properties ) ] [ AS query_expression ]
Example:
CREATE TABLE ctas_hudi WITH ('connector.type' = 'hudi') AS SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;
Resulting table equivalent to:
CREATE TABLE ctas_hudi ( id BIGINT, name STRING, age INT ) WITH ('connector.type' = 'hudi'); INSERT INTO ctas_hudi SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;
Program research
I investigated other bigdata engine implementations such as hive, spark:
Hive(MR) :atomic
Hive MR is client mode, the client is responsible for parsing, compiling, optimizing, executing, and finally cleaning up.
Hive executes the CTAS command as follows:
- Execute query first, and write the query result to the temporary directory.
- If all MR tasks are executed successfully, then create a table and load the data.
- If the execution fails, the table will not be created.
Spark(DataSource v1) : non-atomic
There is a role called driver in Spark, the driver is responsible for compiling tasks, applying for resources, scheduling task execution, tracking task operation, etc.
Spark executes CTAS steps as follows:
- Create a sink table based on the schema of the query result.
- Execute the spark task and write the result to a temporary directory.
- If all Spark tasks are executed successfully, use the Hive API to load data into the sink table created in the first step.
- If the execution fails, driver will drop the sink table created in the first step.
Spark(DataSource v2, Not yet completed, Hive Catalog is not supported yet) : optional atomic
Non-atomic
Non-atomic implementation is consistent with DataSource v1 logic. For details, see CreateTableAsSelectExec .
Atomic
Atomic implementation( for details, see AtomicCreateTableAsSelectExec), supported by StagingTableCatalog and StagedTable .
StagedTable supports commit and abort.
StagingTableCatalog is in memory, when executes CTAS steps as follows:
- Create a StagedTable based on the schema of the query result, but it is not visible in the catalog.
- Execute the spark task and write the result into StagedTable.
- If all Spark tasks are executed successfully, call StagedTable#commitStagedChanges(), then it is visible in the catalog.
- If the execution fails, call StagedTable#abortStagedChanges().
Implementation Plan
Execution Flow
Steps:
- Create the sink table in the catalog based on the schema of the query result.
- Start the job and write the result to target.
- If the job executes successfully, then make data visible.
- If the job execution fails, then drop the sink table or delete data.(This capability requires runtime module support, such as hook, and SQL passes relevant parameters to the runtime module.)
Supported Job Mode
Support both streaming and batch mode.
The execution flow of streaming and batch is similar, the main differences are in atomicity and data visibility
Streaming
Since streaming job are long-running, usually data is to be consumed downstream in real time. Determined by the specific Sink implementation.
- Data is visible after Checkpoint is success or visible immediately after writing.
- In stream semantics, the data is as continuous as possible, strict atomicity is not guaranteed. Therefore, when the job fails, there is a high probability that the sink does not need to drop the table.
Batch
The batch job will end with disabled checkpoint, so we want the data to be visible after the job is success, and drop the table if the job fails.
Some external storage systems cannot be supported, such as Redis.
We will refer to spark DataSource v1 implementation.
- Provides atomic capabilities, if the job fails, drop the table.(Requires runtime module support, when the job finally fails, notify the sink to clean up. )
- Data visibility depends on the specific external storage, and can be divided into write-visible, final visibility, and incremental visibility. (Described in the Data Visibility section)
Drop the table if the job fails requires some additional support(both Streaming and Batch):
- TableSink needs to provide the CleanUp API, developers implement as needed. Do nothing by default. If an exception occurs, can use this API to drop table or delete the temporary directory, etc.
Precautions
when need drop table:
- User manually cancel the job.
- Job final FAILED status, such as after exceeds the maximum number of task Failovers.
Drop table and TableSink are strongly bound:
Do not do drop table operations in the framework, drop table is implemented in TableSink according to the needs of specific TableSink, because the operations of different sinks is different.
For example, in HiveTableSink, we need to delete the temporary directory and drop the metadata in the Metastore, but FileSystemTableSink only need to delete the temporary directory,
it is also possible that no operations is required.
Atomicity & Data Visibility
Atomicity
CTAS does not provide strict atomicity, we will create the table first, the final atomicity is determined by the cleanUp implementation of TableSink.
This requires runtime module support, like the description in the Execution Flow.
Data Visibility
Regarding data visibility, it is determined by the TableSink and runtime-mode:
Stream mode:
If the external storage system supports transactions or two-phase commit, then data visibility is related to the Checkpoint cycle,
otherwise, data is visible immediately after writing, which is consistent with the current flink behavior.
Batch mode:
- FileSystem Sink: Data should be written to the temporary directory first, visible after the final job is successful(final visibility).
- Two-phase commit Sink: Data visible after the final job is successful(final visibility).
- Supports transaction Sink: Commit transactions after the final job is successful(final visibility). Commit transactions periodically or with a fixed number of records(incremental visibility).
- Other Sink: Data is visible immediately after writing(write-visible).
Catalog
We can think that there are two types of catalogs in Flink, in-memory catalogs and external catalogs:
In-memory catalog:
- Metadata is a copy of the metadata of the external system, and the user ensures that the entity exists in the external system and the metadata is consistency, otherwise, throw exception when running. CTAS need create table first, so it is hard to ensures that the entity exists in the external system and the metadata is consistency.
- The user needs to configure the parameters of the external system through the with syntax, and Flink cannot obtain it through the in-memory directory.
Such as kafka table, we need the user to tell us the address of the kafka server, the name of the topic, and the data serialization format, otherwise flink job will failed.
External catalog:
- Metadata directly refers to external systems, and there is no consistency problem. Create table also directly calls the external system, so it is naturally guaranteed that the entity exists in the external system.
- The with syntax parameter is optional, Flink can obtain the necessary parameters through the external catalog.
Such as hive table, we can obtain the table information required by the Flink engine through HiveCatalog.
Both in-memory catalog and external catalog will support CTAS, if the CTAS command is executed in the in-memory catalog and the target store does not exist in the external system, the Flink job will fail, which is consistent with the current flink behavior.
In-memory Catalog,the options of the table are completely dependent on user input.
Managed Table
For Managed Table, please refer to FLIP-188 . Table options that do not contain the ‘connector’ key and value represent a managed table. CTAS also follows this principle.
For details, please refer to the Table Store docs: https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/create-table/
CTAS supports Managed Table and Non-Managed Table.
Users need to be clear about their business needs and set the table options correctly.
Public API Changes
Table Environment
Providing method that are used to execute CTAS for Table API user.
@PublicEvolving /** |
Compatibility, Deprecation, and Migration Plan
It is a new feature with no implication for backwards compatibility.
Test Plan
changes will be verified by UT
Rejected Alternatives
N/A