You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current stateUnder Discussion"

Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-70-Support-Computed-Column-for-Flink-SQL-td33126.html

JIRA: Unable to render Jira issues macro, execution error.

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In umbrella task FLINK-10232 we have introduced CREATE TABLE grammar in our new module flink-sql-parser. And we proposed to use computed column to describe the time attribute of process time in the design doc FLINK SQL DDL, so user may create a table with process time attribute as follows:

create table T1(
  a int,
  b bigint,
  c varchar,
  d as PROCTIME,
) with (
  'k1' = 'v1',
  'k2' = 'v2'
);

The column d would be a process time attribute for table T1.

Besides that, computed  columns have several other use cases, such as these [2]:

  • Virtual generated columns can be used as a way to simplify and unify queries. A complicated condition can be defined as a generated column and referred to from multiple queries on the table to ensure that all of them use exactly the same condition.
  • Stored generated columns can be used as a materialized cache for complicated conditions that are costly to calculate on the fly.
  • Generated columns can simulate functional indexes: Use a generated column to define a functional expression and index it. This can be useful for working with columns of types that cannot be indexed directly, such as JSON columns.
  • For stored generated columns, the disadvantage of this approach is that values are stored twice; once as the value of the generated column and once in the index.
  • If a generated column is indexed, the optimizer recognizes query expressions that match the column definition and uses indexes from the column as appropriate during query execution(Not supported yet).


Computed columns are introduced in MS-SQL-2016 [1], MYSQL-5.6 [2] and ORACLE-11g [3].

Public Interfaces

Computed Column Syntax

Combined with the grammar of MS-SQL-2017[1] and MYSQL-5.6[2], we proposed computed column grammar as follows:

col_name [data_type] AS expr
  [VIRTUAL | STORED] [NOT NULL | NULL]
  [COMMENT 'string']
  • The optional DATA_TYPE, if you want to force the data type of the computed column, specify a data type as a type hint that Flink would use. Such as the Decimal, user can coerce to the precision/scale they want.
  • The VIRTUAL keyword: Column values are not stored, but are evaluated when rows are read, it would trigger computation when it outputs, specifically for proc time(we may do some materialization eagerly, see RelTimeIndicatorConverter for details). A virtual column will not output to the table sink.
  • The STORED keyword: Column values are evaluated and stored when rows are inserted or updated. A stored column does require storage space and can be indexed.(We may support this in the future if it is needed.)
  • The default keyword is VIRTUAL if neither keyword is specified.
  • It can be combined with a NULL attribute, or provide a comment. The nullability is decided by the expression and can not be specified.


Restrictions:

  • Literals, deterministic built-in functions, and operators are permitted. The function can only be non-deterministic when the column strategy is VIRTUAL. User defined functions and built-in functions are allowed.
  • It does not support sub-query.
  • The stored column can only be inserted into explicitly with DEFAULT value from ROW values expression, see example#1 below.
  • It can only refer to columns defined in the same table.


Proposed Changes

Plan Rewrite

Assumes we have a table named T2 with a schema as follows:

create table T2(
  a int,
  b as a + 1 virtual,
  c bigint,
  d as c - 1 stored,
  e varchar
) with (
  k1=v1,
  k2=v2
);

The Read

If T2 is used as a table scan(table source), the T2 column b would be computed in a projection right above the scan, the column d comes directly from the table T2 because it is stored. That means, we would do an equivalence conversion from the scan node to a project above the scan.

That means we would add the projection expressions for all the VIRTUAL computed columns, the TableSource row type should exclude these VIRTUAL columns. Caution that the STORED column is as a normal column for table source.

The Write

If T2 is used as an insert target table(table sink), the T2 column b is totally ignored because it is virtual(not stored); The column d can only be an insert explicitly with DEFAULT value. No matter whether it is specified explicitly or not, the value would be derived from the expression that InitializerExpressionFactory pre-defines. For example, it may be derived with expression `c-1` within another projection right before we make insert into the target table.

That means we would add the projection expressions for the computed columns with STORED column strategy, the TableSink row type should exclude those VIRTUAL columns.

These plan rewrite happens during the sql-to-rel conversion phrase.

Column Computation and Storage

For table scan, virtual column are computed from expression; stored column are queried from the real table source.

For table sink, virtual column is ignored; stored column is computed from expression when insert into sink.

The TableSchema Change

We would extend the TableSchema to support computed columns.(We should always keep one TableSchema in the code, in the future, connectors would only see the row type but not the TableSchema so we should not have this concern.)

We would introduce a new structure named TableColumn to hold the column name, column data type and column strategies info. The TableSchema holds the TableColumn info instead of the original field names and field data types.

TableSchema Row Type Inference

In TableSchema, we would always ignore the VIRTUAL columns when deducing the row type. This row type is corresponding to the output physical type of the table source and input physical type of the table sink.

Persistence

We would put the TableColumn info of the InternalTableSchema into the CatalogTable properties which would be used to persist.

There may be a tool class named ColumnStrategyParser to parse the string literals to ColumnStrategy.

Compatibility, Deprecation, and Migration Plan

This is a new feature and compatible with old version Flink. We may extend the TableSourceTable to support computed column interfaces, but this is transparent to user.

Implementation Plan

  1. Introduce ColumnStrategy enum to describe the NULL/NOT NULL/VIRTUAL/STORED attributes of the DDL table column, a table column can have multiple attributes, i.g. a VIRTUAL column can also be declared as NOT NULL.
  2. Introduce the InitializerExpressionFactory to handle the initialization of the default value and generation of the computation expressions for generated columns.
  3. Make the FlinkRelOptTable extend the interface InitializerExpressionFactory because it is the abstraction of out Flink table for Calcite schema look up.
  4. Introduce the TableColumn structure in InternalTableSchema to describe the name/type/expression of the declared columns from the DDL. TableColumn should be kept serializable in order to be persisted within the BaseCatalogTable into the catalog.
  5. Introduce the InternalTableSchema to describe our internal table schema(not visible to the table source/sink). The InternalTableSchema contains definitions of all the TableColumn.

Test Plan

The implementation can be tested with unit tests for every new feature. And we can add integration tests for connectors to verify it can cooperate with existing source implementations.


Reference:

[1] https://docs.microsoft.com/en-us/sql/relational-databases/tables/specify-computed-columns-in-a-table?view=sql-server-2016

[2] https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html

[3] https://oracle-base.com/articles/11g/virtual-columns-11gr1


  • No labels