You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state["Under Discussion"]

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: Unable to render Jira issues macro, execution error.

Released: 1.16

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The current syntax/features of Flink SQL is very perfect in both stream mode and batch mode.

But there are still some usability to improve.

for example, If the user wants to insert data into a new table, 2 steps are required:

First, prepare the DDL statement of the table named t1;

Second, insert the data into t1;

These two steps seem to be normal, but if there are many fields, spelling DDL statements can be difficult, 

and write out these columns in the following insert statement.

Therefore, we can support CTAS (CREATE TABLE AS SELECT) like MySQL, Oracle, Microsoft SQL Server, Hive, Spark, etc ...

It will be more user friendly. In addition, the Hive dialect already has some support for CTAS.

Proposed Changes

I suggest introducing a LIKE clause with a following syntax:

syntax
CREATE TABLE [ IF NOT EXISTS ] table_name 
[ WITH ( table_properties ) ]
[ AS query_expression ]


Example:

syntax
CREATE TABLE ctas_hudi
 WITH ('connector.type' = 'hudi')
 AS SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;


Resulting table equivalent to:

syntax
CREATE TABLE ctas_hudi
 (
 	id BIGINT,
 	name STRING,
 	age INT
 )
 WITH ('connector.type' = 'hudi');

INSERT INTO ctas_hudi SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;


Implementation Plan

Supported Job Mode

Support both streaming and batch mode.

In order to guarantee atomicity, there will be differences in implementation details.

Streaming

Since streaming job are long-running, the table needs to be created first.

First, create a table in the catalog according to the schema of the query result of the select statement;

Second, start the job to write the result to the table created in the first step.

Batch

The batch job will end. In order to guarantee atomicity, we usually write the results in a temporary directory.

After the job is successfully executed, move the data to the official directory and create a table, like hive/spark.

Because Flink Job runs in detach mode most of the time,

we finally need to perform the operations of moving data and creating tables in JM.

Support in Table API

Support of that feature in Table API will require a separate FLIP, as the connect API requires a rework anyway.

Compatibility, Deprecation, and Migration Plan

It is a new feature with no implication for backwards compatibility.

Test Plan

changes will be verified by UT

Rejected Alternatives

N/A

References

  1. Support SELECT clause in CREATE TABLE(CTAS)
  2. MySQL CTAS syntax
  3. Microsoft Azure Synapse CTAS
  4. LanguageManual DDL#Create/Drop/ReloadFunction
  5. Spark Create Table Syntax



  • No labels