You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In FLIP-218, Flink support CREATE TABLE AS SELECT  statement which allows users to create new tables based on existing tables or query result. It's convenient for data analysts and data scientists to manage their data. However, Flink does not currently support the REPLACE TABLE AS SELECT  statment which enables users to replace an existing table with new data. With REPLACE TABLE AS SELECT , they won't need to drop the table firstly, and use CREATE TABLE AS SELECT  then. Only one single REPLACE TABLE AS SELCT  statement can meet their needs.

So, this FLIP is aimed to support REPLACE TABLE AS SELECT  statement in Flink.

Public Interfaces

Syntax

We propose add the following syntax for REPLACE TABLE AS SELECT  statement:

REPLACE TABLE table_identifier
[(
 [ < physical_column_definition >, ... ]
 [ < table_constraint >, ... ])]
[ COMMENT table_comment ]
[ PARTITIONED BY ( col_name1, col_name3, ...) ]
[ WITH ( key1=val1, key2=val2, ... ) ]
AS <table subquery>

<physical_column_definition>:
  column_name column_type [ <column_constraint> ] [COMMENT column_comment]
  
<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED


Also, we would like to propose to CREATE OR REPLACE TABLE AS  to wrap CREATE TABLE AS SELECT  and REPLACE TABLE AS SELECT  which will create table if the table to be replaced doesn't exist.

CREATE OR REPLACE TABLE table_identifier
[(
 [ < physical_column_definition >, ... ]
 [ < table_constraint >, ... ])]
[ COMMENT table_comment ]
[ PARTITIONED BY ( col_name1, col_name3, ...) ]
[ WITH ( key1=val1, key2=val2, ... ) ]
AS <table subquery>

<physical_column_definition>:
  column_name column_type [ <column_constraint> ] [COMMENT column_comment]
  
<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED


Public Interfaces Change

We propose the following public interface changes for REPLACE TABLE AS SELECT  statement:

@PublicEvolving
public interface Catalog {
  
   /** Prepare replace table. */
   Optional<TwoPhaseCommitCatalogTable> prepareReplaceTable(xxx, xxxx, xxxx, xx);

}


Proposed Changes

For REPLACE TABLE AS  statement:

1:  Construct the table to be created

 If the column definition is specificed, it will use the defined column to  create the new table.  Otherwise, it'll retrieve the column definition from the sub-query.

2: Check the table exists or not. If the table doesn't exist, throw TableException(String.format("The table %s to be replaced doesn't exist. You may want to use CREATE TABLE AS statement or CREATE OR REPLACE TABLE AS statement.",tableIdentifier)).

3: Call method  Catalog#prepareReplaceTable  to try to get the TwoPhaseCommitCatalogTable .

    a: If get empty, then the atomicity can not be guaranted. Flink will do the operations for Replace Table one by one without atomicity guarantee. More exactly, it will drop the old table, create the new table, insert data into the new tables.

    b: Otherwise, it expects the atomicity to be guaranteed by external connector implementation. The Flink will generate a insert job according to the table subquery in REPLACE TABLE AS statment, and call method TwoPhaseCommitCatalogTable#begainTransanction  before the insert job start, call method TwoPhaseCommitCatalogTable#commit  after the job finish, call method TwoPhaseCommitCatalogTable#abort  if the job fail or canceled.


For CREATE OR REPLACE TABLE AS  stament,  when the table exists, it'll consider it as REPLACE TABLE AS  statement. Otherwise, it'll consider it as CREATE TABLE AS  statement. 


Compatibility, Deprecation, and Migration Plan

No any compatibility problem.

Test Plan

UT & IT

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels