Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current stateUnder Discussion"

Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-70-Support-Computed-Column-for-Flink-SQL-td33126.html

...

Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14386

...

Release1.10


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Computed columns are introduced in MS-SQL-2016 [1], MYSQL-5.6 [2] and ORACLE-11g [3].

Public Interfaces

Computed Column Syntax

Combined with the grammar of MS-SQL-2017[1] and MYSQL-5.6[2], we proposed computed column grammar as follows:

Code Block
languagesql
col_name [data_type] AS expr
  [VIRTUAL | STORED] [NOT NULL | NULL]
  [COMMENT 'string']
  • Based on the review comment, data type can be deduced from the expression.
  • This column is known as a VIRTUAL column,
  • The optional DATA_TYPE, if you want to force the data type of the computed column, specify a data type as a type hint that Flink would use. Such as the Decimal, user can coerce to the precision/scale they want.
  • The VIRTUAL keyword: Column values are not stored, but are evaluated when rows are read, it would trigger computation when it outputs, specifically for proc time(we may do some materialization eagerly, see RelTimeIndicatorConverter for details). A virtual column will not output to the table sink.See the “Column Computation/Storage” part for how the VIRTUAL column is persisted when table used as a source or sink.
  • We may support the STORED/VIRTUAL keyword The STORED keyword: Column values are evaluated and stored when rows are inserted or updated. A stored column does require storage space and can be indexed.(We may support this in the future if it is needed.)
  • The default keyword is VIRTUAL if neither keyword is specified. and user has no change to specify it explicitly.
  • It can be combined with a NULL attribute, or provide a comment.The nullability is decided by the expression and can not be specified.
Restrictions:
  • Literals, deterministic built-in functions, and operators are permitted. The function can only be non-deterministic when the column strategy is VIRTUAL. User user defined functions and built-in functions are allowed.
  • It does not support sub-query.
  • The stored column can only be inserted into explicitly with DEFAULT value from ROW values expression, see example#1 below.
  • It can only refer to columns defined in the same table.

InitializerExpressionFactory

InitializerExpressionFactory defines the value generation strategy for computed columns. It would generate a RexNode for each VIRTUAL column which can be used in the logical plan to derive the value we want. The graph following illustrates how it works for a select statement:

Image Added

TableColumn

TableColumn describe a column definition of a table that includes the definition of column name, column data type and the column strategies.

TableSchema

TableSchema describes a table schema for internal and current connector use.

Proposed Changes

Plan Rewrite

...

Code Block
languagesql
create table T2(
  a int,
  b as a + 1 virtual,
  c bigint,
  d as c - 1 stored,
  e varchar
) with (
  k1=v1,
  k2=v2
);

...

If T2 is used as a table scan(table source), the T2 column b would be computed in a projection right above the scan, the column d comes directly from the table T2 because it is stored. That means, we would do an equivalence conversion from the scan node to a project above the scan.

Image RemovedImage Added

That means we We would add the projection expressions for all the VIRTUAL computed columns, the TableSource row type should exclude these VIRTUAL columns. Caution that the STORED column is as a normal column for table source.

The Write

If T2 is used as an insert target table(table sink), the T2 column b is totally ignored because it is virtual(not stored); The column d can only be an insert explicitly with DEFAULT value. No matter whether it is specified explicitly or not, the value would be derived from the expression that InitializerExpressionFactory pre-defines. For example, it may be derived with expression `c-1` within another projection right before we make insert .

Image Added

We would compute the non VIRTUAL column and insert it into the target table.

Image Removed

That means we would add the projection expressions for the computed columns with STORED column strategy, the TableSink row type should exclude those VIRTUAL columns.

Note: These plan rewrite happens during the sql-to-rel conversion phrase.

Column Computation and Storage

For table scan, virtual column are computed from expression; stored column are queried from the real table source.

For table sink, virtual column is ignored; stored column is computed from expression when insert into sink.

The TableSchema Change

We would extend the TableSchema to support computed columns.(We should always keep one TableSchema in the code, in the future, connectors would only see the row type but not the TableSchema so we should not have this concern.)

...

We would put the TableColumn info of the InternalTableSchema into the CatalogTable properties which would be used to persist. For every column expression, there are three items to persist:

  • The column name
  • The column data type
  • The column expression of SQL-style string that looks like that how user writes them(except if the expression contains a UDF, we should expand it's schema path for a complement reference)

When deserializing, we use SqlParser to parse the expression strings into SqlNode, then converts it to RexNode and apply the projectionsThere may be a tool class named ColumnStrategyParser to parse the string literals to ColumnStrategy.

Compatibility, Deprecation, and Migration Plan

This is a new feature and compatible with old version Flink. We may extend the TableSourceTable to support computed column interfaces, but this is transparent to user.

Implementation Plan

  1. Introduce the InitializerExpressionFactory to handle the initialization of the default value and generation of the computation expressions for generated columns.
  2. Make the FlinkRelOptTable extend the interface InitializerExpressionFactory because it is the abstraction of out Flink table for Calcite schema look up.
  3. Introduce the TableColumn structure in TableSchema to describe the name/type/expression of the declared columns from the DDL. TableColumn should be kept serializable in order to be persisted within the BaseCatalogTable into the catalog.
  4. Extend the TableSchema to contain definitions of all the TableColumn.

Test Plan

The implementation can be tested with unit tests for every new feature. And we can add integration tests for connectors to verify it can cooperate with existing source implementations.


Reference:

[1] https://docs.microsoft.com/en-us/sql/relational-databases/tables/specify-computed-columns-in-a-table?view=sql-server-2016

[2] https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html

[3] https://oracle-base.com/articles/11g/virtual-columns-11gr1