Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties


Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)thread/zon967w7synby8z6m1s7dj71dhkh9ccy
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Release<Flink Version>


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Many SQL vendors expose additional metadata via so-called "pseudo columns" or "system columns" next to the physical columns.

...

Esp. for platform teams and connector/catalog implementers, it would be beneficial to offer system additional columns for commonly used properties such as the row's timestamp or row number.

However, those columns should not be selectable selected by default and should not conflict with physical columns. when expaning SELECT *.  Also for the sake of backward compatibility.

...

This proposal suggests to evolve the existing column design slightly to be more useful for platform providers.

Public Interfaces

  • SELECT *SHOW CREATE TABLE

Proposed Changes

We propose to introduce the Instead of introducing a new concept of system columns.

System columns are a special case of a metadata column:

  • They should be read-only.

  • They should not influence the query-to-sink schema.

  • Thus, only METADATA VIRTUAL columns qualify as system columns.

  • They start with a reserved prefix.

  • Metadata columns with different alias (using METADATA FROM) do not qualify as system columns.

Metadata Key Prefix Constraint

Depending on the catalog implementation, system columns might be available in every table, thus conflicts with physical columns must be avoided.

We suggest to apply constraints on the prefix of a system column to make it immediately visible that a system column is used.

The prefix constraint is on the metadata key, not on the column name itself. This avoids backwards compatibility issues.

Open question: Which sign should indicate a system column?
Current preference: Option 3

Option 1: Underscore

Pro:

  • Similar to BigQuery
  • Queries are still nicely readable
    • SQL: SELECT _rowtime FROM t
    • Table API: tableEnv.from("t").select(Expressions.$("_rowtime"))

Con:

  • Underscore is a common character for "private"/"internal" fields

  • Likelihood of conflicts might be higher as it used quite often in physical columns (allowed in Avro, Protobuf, JSON)

Option 2: Double Underscore

Pro:

  • Queries are also nicely readable

Con:

  • Underscore is a common character for "private"/"internal" fields

  • Still allowed in Avro, Protobuf, JSON but with less likelihood of causing conflicts

Option 3: Dollar sign

Pro:

  • Less conflicts with existing physical columns
  • Uncommon in JSON, not allowed in Avro and Protobuf

  • Dollar signs are used for other generated things already (window properties, Paimon side tables) so users know that those come from the system

Con:

  • Queries are not so nicely readable
    • SQL: SELECT $rowtime FROM t
    • Table API: tableEnv.from("t").select(Expressions.$("$rowtime"))

      But maybe we can advertise Expressions.col("$rowtime") in the future.

Option 4: Configurable

Pro:

  • Provides highest flexibility

Con:

  • Could be confusing for the overall Flink experience
  • Query semantics would depend on the connector+system configuration

SQL Examples: SELECT / INSERT INTO

around system columns, we propose to leverage virtual metadata column concept and adjust their expanding behavior.

A new ConfigOption configures the expanding behavior:

    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
    public static final ConfigOption<List<ColumnExpansionStrategy>>
            TABLE_COLUMN_EXPANSION_STRATEGY =
                    key("table.column-expansion-strategy")
                            .enumType(ColumnExpansionStrategy.class)
                            .asList()
                            .defaultValues()
                            .withDescription(
                                    "Configures the default expansion behavior of 'SELECT *'. "
                                            + "By default, all top-level columns of the table's "
                                            + "schema are selected and nested fields are retained.");

    enum ColumnExpansionStrategy {
        // Excludes `c METADATA VIRTUAL FROM k` from SELECT *
        EXCLUDE_ALIASED_VIRTUAL_METADATA_COLUMNS,
        
        // Excludes `c METADATA VIRTUAL` from SELECT *
        EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS
    }

Notes:

  • The ConfigOption is prepared for future extensions (e.g. excluding computed columns, system columns, or default columns)
  • Since the ConfigOption is a list, multiple strategies can be defined.


Excluded columns are not expanded System columns are not contained in a SELECT * FROM t or tableEnv.from("t").select(Expressions.$("*")).

Users need to select them explicitly. This allows for adding more system columns in the future without breaking existing SQL queries.

DESCRIBE works as before and will list the column. SHOW CREATE TABLE will skip the column.

SQL Example: SELECT / INSERT INTO

Code Block
-- Given a table t with schema
-- (
--   i INT,
--   s STRING).,
-- Catalog additionally exposesrt1 ($rowtime TIMESTAMP_LTZ(3)) when queried for t.
 
SELECT * FROM t;METADATA VIRTUAL,
-- returns (i INT, s STRING)
 
SELECT $rowtime, * FROM t;rt2 TIMESTAMP_LTZ(3) METADATA VIRTUAL FROM 'timestamp',
-- returns  ($rowtimert3 TIMESTAMP_LTZ(3) METADATA,
-- i INT, s STRING)
 
INSERT INTO t SELECT * FROM t;rt4 TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
-- works)

 
INSERT INTO t SELECT $rowtime, * FROM t;
-- willreturns error because system column is not writable
 
DESCRIBE t(i, s, rt1, rt2, rt3, rt4)


-- willTableEnvironment onlyis show (i INT, s STRING, $rowtime TIMESTAMP_LTZ(3))
 
SHOW CREATE TABLE
-- will only show (i INT, s STRING)
Code Block
-- Given a table t ($rowtime TIMESTAMP_LTZ(3), i INT, s STRING).
-- Catalog should not expose system column when queried for t. It should expose the physical column.
-- Ideally, we should support two columns with the same name but this is currently not supported in Flink.
-- If we support two columns with the same name in the future, the physical column should have precedence during a SELECT *.
 
SELECT * FROM t;
-- returns ($rowtime TIMESTAMP_LTZ(3), i INT, s STRING)
Code Block
-- Given a table t (custom_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL FROM '$rowtime', i INT, s STRING).
 configured as
-- table.column-expansion-strategy = EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS

SELECT * FROM t;
-- returns (i, s, rt2, rt3, rt4)

SELECT rt1, * FROM t;
-- rt1 must be selected explicitly and returns (rt1, i, s, rt2, rt3, rt4)


-- TableEnvironment is configured as
-- table.column-expansion-strategy = EXCLUDE_ALIASED_VIRTUAL_METADATA_COLUMNS

SELECT * FROM t;
-- returns (custom_ts TIMESTAMP_LTZ(3), i INT, s STRING) because "custom_ts" 
-- uses an alias and potentially a different data type to which the system 
-- column is casted.i, s, rt1, rt3, rt4)

INSERT INTO t SELECT * FROM t;
-- always works


Compatibility, Deprecation, and Migration Plan

Major compatibility issues should not arise as the whole behavior depends on metadata key of the connectorConfigOption.

No metadata key currently starts with "_", or "$" and no system column will be added by existing catalogs yet.

Test Plan

Unit tests on table environment and maybe one ITCase should be sufficient.

Rejected Alternatives

Completely new concept of System Columns

New concept of system columns with a reserved prefix such as `$`.

Metadata columns fit nicely and require no additional interfaces for connectors and/or changes within planner rules.

→ Concept of system columns including columns that can be expose per operation are future work.