Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-17001

Release1.11

Status

Current state: Under discussion

Discussion thread:

JIRA:

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Often users need to adjust existing tables slightly. This is especially useful when users need to enhance a table created from an external tool (e.g. HIVE) with Flink's specific information such as e.g watermarks. It can also be a useful tool for ETL processes, e.g. merging two tables into a single one with a different connector.  My suggestion would be to support a variation of an optional Feature T171, “LIKE clause in table definition” of SQL standard 2008.

...

Code Block
languagesql
CREATE [ TEMPORARY ] TABLE [ IF NOT EXISTS ] table_name ( [
  {   column
    | table_constraint
    [, ...  |]
] )
[ WITH ( table_properties ) ]
[ LIKE parent_table [ like_options ] }
    [, ... ]
] )

where like_options are:

  { INCLUDING | EXCLUDING } ALL
| [{ 
     { INCLUDING | EXCLUDING } { CONSTRAINTS }
   | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS } }
   [, ...]
  ]

...

Code Block
languagesql
CREATE [TEMPORARY] TABLE base_table (
    id BIGINT,
    name STRING,
    tstmp TIMESTAMP,
	PRIMARY KEY(id)
) WITH (
    ‘connector.type’: ‘kafka’
)

CREATE [TEMPORARY] TABLE derived_table (
    LIKE base_table,
    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
)
LIKE base_table;

Resulting table equivalent to:

...

CONSTRAINTS | INDEXES | STORAGE | COMMENTS | ALL

I suggest supporting INCLUDING/EXCLUDINGlike-options:

  • ALL
  • CONSTRAINTS: primary keys, unique key, does not include NOT NULL constraint (in Flink it's part of the type)
  • GENERATED: computed columns and watermarks
  • OPTIONS: connector properties in WITH (...) clause
  • PARTITIONS: partitions definition

Additionally I suggest supporting different INCLUDING/EXCLUDING like-options strategies:

  • INCLUDING (default -> fail on duplicate keys)
  • EXCLUDING
  • OVERWRITING (any explicit options/columns overwrite options/columns from base source, overwriting happens in the order the LIKE clauses were declared)

The supported combinations would be:


INCLUDINGEXCLUDINGOVERWRITING
ALL(tick)(tick)(error)
CONSTRAINTS(tick)(tick)(error)
PARTITIONS(tick)(tick)(error)
GENERATED(tick)(tick)(tick)
OPTIONS(tick)(tick)(tick)

The reason why I suggest not to support OVERWRITING CONSTRAINTS is that usually it's rather hard to overwrite only some of the constraints. If the constraint(primary key, unique key) from the base table does not apply to the derived table, in my opinion it is safer to exclude all constraints and redefine them.  We could revisit that in the future, if we see a need to support also OVERWRITING for constraints. The same applies for PARTITIONS.

Default options:

    INCLUDING ALL = INCLUDING CONSTRAINTS

                                       INCLUDING GENERATED

                                       INCLUDING PARTITIONS

                                       INCLUDING OPTIONS

...

Code Block
languagesql
CREATE [TEMPORARY] TABLE base_table_1 (
    id BIGINT,
    name STRING,
    tstmp TIMESTAMP,
	PRIMARY KEY(id)
) WITH (
    ‘connector’: ‘kafka’,
    ‘connector.starting-offset’: ‘12345’,
    ‘format’: ‘json’
)

CREATE [TEMPORARY] TABLE base_table_2 (
    tstmp TIMESTAMP,
	PRIMARY KEY(tstmp)
) WITH (
    ‘connector’: ‘filesystem’,
    ‘format’: ‘csv’,
    ‘format.delimiter’: ‘\t’   
)

CREATE [TEMPORARY] TABLE derived_table (
    LIKE base_table_1 (
		OVERWRITING OPTIONS),
    LIKE base_table_2 (
		EXCLUDING OPTIONS, 
		EXCLUDING CONSTRAINTS),
    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
) 
WITH (
    ‘connector.starting-offset’: ‘0’
)
LIKE base_table (OVERWRITING OPTIONS, EXCLUDING CONSTRAINTS)

The derived_table will be equivalent to:

Code Block
languagesql
CREATE [TEMPORARY] TABLE derived_table (
    id BIGINT,
    name STRING,
	    tstmp TIMESTAMP,
    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND,
    PRIMARY KEY(id)
) WITH (
    ‘connector’: ‘kafka’,
    ‘connector.starting-offset’: ‘0’,
    ‘format’: ‘json’
)

Merging columns of the original table

The columns of the original table shall be appended to the columns defined in the current statement. If the are columns with the same name in both tables an exception will be thrown.

Support in Table API

Support of that feature in Table API will require a separate FLIP, as we the connect API requires a rework anyway.

...

It is a new feature with no implication for backwards compatibility.

Rejected Alternatives

Original suggestion was to put the LIKE clause in the schema part. During the discussion it was mentioned that it is a bit weird that a clause in the schema part affects options in the WITH ( /* connector properties */) clause. We moved the LIKE clause out of the schema part.


Originally the suggestion was to allow multiple LIKE clauses. It was suggested to drop that for the first version and support it, if such requirement shows up.