You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

Current state["Under Discussion"]

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: Unable to render Jira issues macro, execution error.

Released: 1.16

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The current syntax/features of Flink SQL is very perfect in both stream mode and batch mode.

But there are still some usability to improve.

for example, If the user wants to insert data into a new table, 2 steps are required:

First, prepare the DDL statement of the table named t1;

Second, insert the data into t1;

These two steps seem to be normal, but if there are many fields, spelling DDL statements can be difficult, 

and write out these columns in the following insert statement.

Therefore, we can support CTAS (CREATE TABLE AS SELECT) like MySQL, Oracle, Microsoft SQL Server, Hive, Spark, etc ...

It will be more user friendly. In addition, the Hive dialect already has some support for CTAS.

Proposed Changes

I suggest introducing a LIKE clause with a following syntax:

syntax
CREATE TABLE [ IF NOT EXISTS ] table_name 
[ WITH ( table_properties ) ]
[ AS query_expression ]


Example:

syntax
CREATE TABLE ctas_hudi
 WITH ('connector.type' = 'hudi')
 AS SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;


Resulting table equivalent to:

syntax
CREATE TABLE ctas_hudi
 (
 	id BIGINT,
 	name STRING,
 	age INT
 )
 WITH ('connector.type' = 'hudi');

INSERT INTO ctas_hudi SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;

Program research

I investigated other bigdata engine implementations such as hive, spark:

Hive(MR) :atomic

Hive MR is client mode, the client is responsible for parsing, compiling, optimizing, executing, and finally cleaning up.

Hive executes the CTAS command as follows:

  1. Execute query first, and write the query result to the temporary directory.
  2. If all MR tasks are executed successfully, then create a table and load the data.
  3. If the execution fails, the table will not be created.

Spark(DataSource v1) : non-atomic

There is a role called driver in Spark, the driver is responsible for compiling tasks, applying for resources, scheduling task execution, tracking task operation, etc.

Spark executes CTAS steps as follows:

  1. Create a sink table based on the schema of the query result.
  2. Execute the spark task and write the result to a temporary directory.
  3. If all Spark tasks are executed successfully, use the Hive API to load data into the sink table created in the first step.
  4. If the execution fails, driver will drop the sink table created in the first step.

Spark(DataSource v2, Not yet completed, Hive Catalog is not supported yet) : optional atomic

Non-atomic

 Non-atomic implementation is consistent with DataSource v1 logic. For details, see CreateTableAsSelectExec .

Atomic

Atomic implementation( for details, see AtomicCreateTableAsSelectExec), supported by StagingTableCatalog and StagedTable .

StagedTable supports commit and abort. 

StagingTableCatalog is in memory, when executes CTAS steps as follows:

  1. Create a StagedTable based on the schema of the query result, but it is not visible in the catalog.
  2. Execute the spark task and write the result into StagedTable.
  3. If all Spark tasks are executed successfully, call StagedTable#commitStagedChanges(), then it is visible in the catalog.
  4. If the execution fails, call StagedTable#abortStagedChanges().

Implementation Plan

Supported Job Mode

Support both streaming and batch mode.

In order to guarantee atomicity, there will be differences in implementation details.

Streaming

Since streaming job are long-running, the table needs to be created first.

  1. Create the sink table  in the catalog based on the schema of the query result.
  2. Start the job and write the result to the sink table.

Batch

The batch job will end. In order to guarantee atomicity, we usually write the results in a temporary directory.

We will refer to spark DataSource v1 implementation.

Steps:

  1. Create the sink table  in the catalog based on the schema of the query result.
  2. Start the job and write the result to a temporary directory.
  3. If the job executes successfully, then load data into the sink table.
  4. If the job execution fails, then drop the sink table.(This capability requires runtime module support, and SQL passes relevant parameters to the runtime module.)

Drop a Table if the job fails requires some additional support:


Code Changes

parserImpls.ftl

add syntax support

SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
{
final SqlParserPos startPos = s.pos();
boolean ifNotExists = false;
SqlIdentifier tableName;
List<SqlTableConstraint> constraints = new ArrayList<SqlTableConstraint>();
SqlWatermark watermark = null;
SqlNodeList columnList = SqlNodeList.EMPTY;
SqlCharStringLiteral comment = null;
SqlTableLike tableLike = null;
SqlNode query = null;

SqlNodeList propertyList = SqlNodeList.EMPTY;
SqlNodeList partitionColumns = SqlNodeList.EMPTY;
SqlParserPos pos = startPos;
}
{
<TABLE>

ifNotExists = IfNotExistsOpt()

tableName = CompoundIdentifier()
[
<LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();}
TableColumn(ctx)
(
<COMMA> TableColumn(ctx)
)*
{
pos = pos.plus(getPos());
columnList = new SqlNodeList(ctx.columnList, pos);
constraints = ctx.constraints;
watermark = ctx.watermark;
}
<RPAREN>
]
[ <COMMENT> <QUOTED_STRING> {
String p = SqlParserUtil.parseString(token.image);
comment = SqlLiteral.createCharString(p, getPos());
}]
[
<PARTITIONED> <BY>
partitionColumns = ParenthesizedSimpleIdentifierList()
]
[
<WITH>
propertyList = TableProperties()
]
[
<LIKE>
tableLike = SqlTableLike(getPos())
]
<!-- add CTAS support -->
[
<AS>
query = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
]
{
return new SqlCreateTable(startPos.plus(getPos()),
tableName,
columnList,
constraints,
propertyList,
partitionColumns,
watermark,
comment,
tableLike,
query,
isTemporary,
ifNotExists);
}
}

ContextResolvedTable

Add CTAS flag

@Internal
public final class ContextResolvedTable {


    
    ... ...
    

  private boolean isCTAS = false;

  private ContextResolvedTable(
ObjectIdentifier objectIdentifier,
@Nullable Catalog catalog,
ResolvedCatalogBaseTable<?> resolvedTable,
boolean anonymous,
boolean isCTAS) {
this.objectIdentifier = Preconditions.checkNotNull(objectIdentifier);
this.catalog = catalog;
this.resolvedTable = Preconditions.checkNotNull(resolvedTable);
this.anonymous = anonymous;
this.isCTAS = isCTAS;
}
  public boolean isCTAS() {
return isCTAS;
}

}


When initializing TableSink, we can distinguish whether it is a CTAS operation.


Support in Table API

The executeSql method will be reused

TableEnvironment
    /**
     * Executes the given single statement and returns the execution result.
     *
     * <p>The statement can be DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE. For DML and DQL, this method
     * returns {@link TableResult} once the job has been submitted. For DDL and DCL statements,
     * {@link TableResult} is returned once the operation has finished.
     *
     * <p>If multiple pipelines should insert data into one or more sink tables as part of a single
     * execution, use a {@link StatementSet} (see {@link TableEnvironment#createStatementSet()}).
     *
     * <p>By default, all DML operations are executed asynchronously. Use {@link
     * TableResult#await()} or {@link TableResult#getJobClient()} to monitor the execution. Set
     * {@link TableConfigOptions#TABLE_DML_SYNC} for always synchronous execution.
     *
     * @return content for DQL/SHOW/DESCRIBE/EXPLAIN, the affected row count for `DML` (-1 means
     *     unknown), or a string message ("OK") for other statements.
     */
    TableResult executeSql(String statement);


Compatibility, Deprecation, and Migration Plan

It is a new feature with no implication for backwards compatibility.

Test Plan

changes will be verified by UT

Rejected Alternatives

N/A

References

  1. Support SELECT clause in CREATE TABLE(CTAS)
  2. MySQL CTAS syntax
  3. Microsoft Azure Synapse CTAS
  4. LanguageManual DDL#Create/Drop/ReloadFunction
  5. Spark Create Table Syntax




  没有英汉互译结果
  请尝试网页搜索
  • No labels