...
These two steps seem to be normal, but if there are many fields, spelling DDL statements can be difficult, and write out these columns in the following insert statement. Therefore, we can support CTAS (CREATE TABLE AS SELECT) like MySQL, Oracle, Microsoft SQL Server, Hive, Spark, etc ... It will be more user friendly. In addition, the Hive dialect already has some support for CTAS. My suggestion would be to support a variation of an optional Feature T172, “AS subquery clause in table definition”, of SQL standard.
Public API Changes
Table
Providing method that are used to execute CTAS for Table API user.
...
/**
* Declare the pipeline defined by the given {@link Table} object
* to create the table at the specified path.
*/
CreateOrRepalceTable saveAs(String tablePath);}
CreateOrReplaceTable
Proposing a public interface CreateOrReplaceTable used by CTAS for table API.
...
Through the appendix research summary and analysis, the current status of CREAE TABLE AS SELECT(CTAS) in the field of big data is:
- Flink: Flink dialect does not support CTAS. ==> LEVEL-1
- Spark DataSource v1: is atomic (can roll back), but is not isolated. ==> LEVEL-2
- Spark DataSource v2: Guaranteed atomicity and isolation. ==> LEVEL-3
- Hive MR: Guaranteed atomicity and isolation. ==> LEVEL-3
Combining the current situation of Flink and the needs of our business, choosing a Level-2 implementation for Flink in batch execution mode. However, in streaming mode, we don't provide atomicity guarantees because of job is long running. Moreover, at the moment there seems to be no strong need to guarantee atomicity in stream mode.
Syntax
I suggest introducing a CTAS clause with a following syntax:
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE TABLE [ IF NOT EXISTS ] table_name
[ WITH ( table_properties ) ]
[ AS query_expression ] |
Example:
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE TABLE ctas_hudi
WITH ('connector.type' = 'hudi')
AS SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0; |
Resulting table equivalent to:
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE TABLE ctas_hudi
(
id BIGINT,
name STRING,
age INT
)
WITH ('connector.type' = 'hudi');
INSERT INTO ctas_hudi SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0; |
Table
Providing method that are used to execute CTAS for Table API user.
@PublicEvolving /** |
CreateOrReplaceTable
Proposing a public interface CreateOrReplaceTable used by CTAS for table API.
/** A table that need to be created for CTAS. */ |
The CreateOrReplaceTable interface is introduced newly because if we add the create/createIfNotExist API to the Table interface, the user must call the saveAs API before calling these API, which will cause additional usage costs to the user.
The recommended way to use is:
TablePipeline tablePipeline = table.saveAs("my_ctas_table") |
We save the properties set through the option API and set them in the CatalogBaseTable when executing the create/createIfNotExist API, so as to generate the TableSink.
Catalog
Providing method that are used to serialize/deserialize Catalog and infer the option of CatalogBaseTable.
@PublicEvolving |
The CreateOrReplaceTable interface is introduced newly because if we add the create/createIfNotExist API to the Table interface, the user must call the saveAs API before calling these API, which will cause additional usage costs to the user.
The recommended way to use is:
...
TablePipeline tablePipeline = table.saveAs("my_ctas_table")
.option("connector", "filesystem")
.option("format", "testcsv")
.option("path", "/tmp/my_ctas_table/")
.create();
tablePipeline.execute();
We save the properties set through the option API and set them in the CatalogBaseTable when executing the create/createIfNotExist API, so as to generate the TableSink.
Catalog
Providing method that are used to serialize/deserialize Catalog and infer the option of CatalogBaseTable.
@PublicEvolving /** |
Catalog#inferTableOptions is convenient for users to customize the Catalog, and when it supports the CTAS function, the options of the table can be automatically inferred to avoid job failure due to lack of information.
The serialize/deserialize API helps realize the serialization and deserialization of Catalog, and the user can decide what information needs to be saved during serialization, so that it can be used during deserialization.
Proposed Changes
Syntax
I suggest introducing a CTAS clause with a following syntax:
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE TABLE [ IF NOT EXISTS ] table_name
[ WITH ( table_properties ) ]
[ AS query_expression ] |
Example:
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE TABLE ctas_hudi
WITH ('connector.type' = 'hudi')
AS SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0; |
Resulting table equivalent to:
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE TABLE ctas_hudi
(
id BIGINT,
name STRING,
age INT
)
WITH ('connector.type' = 'hudi');
INSERT INTO ctas_hudi SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0; |
Implementation Plan
Through the appendix research summary and analysis, the current status of CTAS in the field of big data is:
- Flink: Flink dialect does not support CTAS. ==> LEVEL-1
- Spark DataSource v1: is atomic (can roll back), but is not isolated. ==> LEVEL-2
- Spark DataSource v2: Guaranteed atomicity and isolation. ==> LEVEL-3
- Hive MR: Guaranteed atomicity and isolation. ==> LEVEL-3
Combining the current situation of Flink and the needs of our business, choosing a Level-2 implementation for Flink.
...
For example: |
Catalog#inferTableOptions is convenient for users to customize the Catalog, and when it supports the CTAS function, the options of the table can be automatically inferred to avoid job failure due to lack of information.
Implementation Plan
The overall execution process is shown in the following figure above.
Due to the client process may exit soon, such as detached mode, choose to create/drop the table on the JM side, and create table and drop table are executed through Catalog. Therefore, a new Hook mechanism and Catalog serialization and deserialization solution need to be introduced. So the overall execution process of CTAS job is as following:
...