Discussion thread
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release1.11

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Often users need to adjust existing tables slightly. This is especially useful when users need to enhance a table created from an external tool (e.g. HIVE) with Flink's specific information such as e.g watermarks. It can also be a useful tool for ETL processes, e.g. merging two tables into a single one with a different connector.  My suggestion would be to support a variation of an optional Feature T171, “LIKE clause in table definition” of SQL standard 2008.

Proposed Changes

I suggest introducing a LIKE clause with a following syntax:

CREATE [ TEMPORARY ] TABLE [ IF NOT EXISTS ] table_name ( [
  {   column
    | table_constraint
    [, ... ]
] )
[ WITH ( table_properties ) ]
[ LIKE parent_table [ like_options ] } ]

where like_options are:

  { INCLUDING | EXCLUDING } ALL
| [{ 
     { INCLUDING | EXCLUDING } { CONSTRAINTS }
   | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS } }
   [, ...]
  ]


Example:

CREATE [TEMPORARY] TABLE base_table (
    id BIGINT,
    name STRING,
    tstmp TIMESTAMP,
	PRIMARY KEY(id)
) WITH (
    ‘connector.type’: ‘kafka’
)

CREATE [TEMPORARY] TABLE derived_table (
    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
)
LIKE base_table;

Resulting table equivalent to:

CREATE [TEMPORARY] TABLE derived_table (
    id BIGINT,
    name STRING,
    tstmp TIMESTAMP,
	PRIMARY KEY(id),
	WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
) WITH (
    ‘connector’: ‘kafka’
)

Configuring behaviour of LIKE:

SQL standard defines a way to configure the behaviour of LIKE clause. SQL standards describes options:

  • INCLUDING IDENTITY | EXCLUDING IDENTITY
  • INCLUDING DEFAULTS | EXCLUDING DEFAULTS
  • INCLUDING GENERATED | EXCLUDING GENERATED

PostgresSQL additionally allows for 

CONSTRAINTS | INDEXES | STORAGE | COMMENTS | ALL

I suggest supporting like-options:

  • ALL
  • CONSTRAINTS: primary keys, unique key, does not include NOT NULL constraint (in Flink it's part of the type)
  • GENERATED: computed columns and watermarks
  • OPTIONS: connector properties in WITH (...) clause
  • PARTITIONS: partitions definition

Additionally I suggest supporting different like-options strategies:

  • INCLUDING (default -> fail on duplicate keys)
  • EXCLUDING
  • OVERWRITING (any explicit options/columns overwrite options/columns from base source)

The supported combinations would be:


INCLUDINGEXCLUDINGOVERWRITING
ALL(tick)(tick)(error)
CONSTRAINTS(tick)(tick)(error)
PARTITIONS(tick)(tick)(error)
GENERATED(tick)(tick)(tick)
OPTIONS(tick)(tick)(tick)

The reason why I suggest not to support OVERWRITING CONSTRAINTS is that usually it's rather hard to overwrite only some of the constraints. If the constraint(primary key, unique key) from the base table does not apply to the derived table, in my opinion it is safer to exclude all constraints and redefine them.  We could revisit that in the future, if we see a need to support also OVERWRITING for constraints. The same applies for PARTITIONS.

Default options:

    INCLUDING ALL = INCLUDING CONSTRAINTS

                                       INCLUDING GENERATED

                                       INCLUDING PARTITIONS

                                       INCLUDING OPTIONS

It is different from the SQL standard. Nevertheless I think it is worth the change as the primary use case for the clause will be slight adjustments to the original schema rather than rewriting the table completely.

Example:

CREATE [TEMPORARY] TABLE base_table_1 (
    id BIGINT,
    name STRING,
    tstmp TIMESTAMP,
	PRIMARY KEY(id)
) WITH (
    ‘connector’: ‘kafka’,
    ‘connector.starting-offset’: ‘12345’,
    ‘format’: ‘json’
)

CREATE [TEMPORARY] TABLE derived_table (
    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
)
WITH (
    ‘connector.starting-offset’: ‘0’
)
LIKE base_table (OVERWRITING OPTIONS, EXCLUDING CONSTRAINTS)

The derived_table will be equivalent to:

CREATE [TEMPORARY] TABLE derived_table (
    id BIGINT,
    name STRING,
    tstmp TIMESTAMP,
    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
) WITH (
    ‘connector’: ‘kafka’,
    ‘connector.starting-offset’: ‘0’,
    ‘format’: ‘json’
)

Merging columns of the original table

The columns of the original table shall be appended to the columns defined in the current statement. If the are columns with the same name in both tables an exception will be thrown.

Support in Table API

Support of that feature in Table API will require a separate FLIP, as the connect API requires a rework anyway.

Compatibility, Deprecation, and Migration Plan

It is a new feature with no implication for backwards compatibility.

Rejected Alternatives

Original suggestion was to put the LIKE clause in the schema part. During the discussion it was mentioned that it is a bit weird that a clause in the schema part affects options in the WITH ( /* connector properties */) clause. We moved the LIKE clause out of the schema part.


Originally the suggestion was to allow multiple LIKE clauses. It was suggested to drop that for the first version and support it, if such requirement shows up.