Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current stateUnder Discussion"

Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-70-Support-Computed-Column-for-Flink-SQL-td33126.html

...

Code Block
languagesql
col_name [data_type] AS expr
  [VIRTUAL | STORED] [NOT NULL | NULL]
  [COMMENT 'string']
  • Based on the review comment, data type can be deduced from the expression.
  • This column is known as a VIRTUAL column,
  • The optional DATA_TYPE, if you want to force the data type of the computed column, specify a data type as a type hint that Flink would use. Such as the Decimal, user can coerce to the precision/scale they want.
  • The VIRTUAL keyword: Column values are not stored, but are evaluated when rows are read, it would trigger computation when it outputs, specifically for proc time(we may do some materialization eagerly, see RelTimeIndicatorConverter for details). A virtual column will not output to the table sink.See the “Column Computation/Storage” part for how the VIRTUAL column is persisted when table used as a source or sink.
  • We may support the STORED/VIRTUAL keyword The STORED keyword: Column values are evaluated and stored when rows are inserted or updated. A stored column does require storage space and can be indexed.(We may support this in the future if it is needed.)
  • The default keyword is VIRTUAL if neither keyword is specified. and user has no change to specify it explicitly.
  • It can be combined with a NULL attribute, or provide a comment.The nullability is decided by the expression and can not be specified.
Restrictions:
  • Literals, deterministic built-in functions, and operators are permitted. The function can only be non-deterministic when the column strategy is VIRTUAL. User user defined functions and built-in functions are allowed.
  • It does not support sub-query.
  • The stored column can only be inserted into explicitly with DEFAULT value from ROW values expression, see example#1 below.
  • It can only refer to columns defined in the same table.

ColumnStrategy

...

  • .

...

languagejava

...

InitializerExpressionFactory

InitializerExpressionFactory defines the value generation strategy for computed columns. It would generate a RexNode for each VIRTUAL column which can be used in the logical plan to derive the value we want.

...

languagejava

...

The graph following illustrates how it works for a select statement:

Image Added

TableColumn

TableColumn describe a column definition of a table that includes the definition of column name, column data type and the column strategies.

...

...

/**
 * A table column represents a table's column structure with field name, data type
 * and column strategies.
 */
public class TableColumn {
  private String fieldName;
  private DataType fieldDataType;
  private Expression expression; // the computed column expression.
  private ColumnStrategy columnStrategy;

  private TableColumn(String fieldName,
      DataType fieldDataType,
      ColumnStrategy columnStrategy) {
    this.fieldName = Objects.requireNonNull(fieldName, "Column name can not be null.");
    this.fieldDataType = Objects.requireNonNull(fieldDataType,
      "Column data type can not be null.");
    this.expression = expression;
    this.columnStrategy = Objects.requireNonNull(columnStrategy);
  }

  public enum ColumnStrategy {
        ...
  }
  ...
}

TableSchema

TableSchema describes a table schema for internal and current connector use.

Code Block
languagejava
/**
 * A table schema that represents a table's structure with field names and data types.
 */
public class TableSchema {
  private final TableColumn[] tableColumns;
    ...
}

Proposed Changes

Plan Rewrite

...

If T2 is used as a table scan(table source), the T2 column b would be computed in a projection right above the scan, the column d comes directly from the table T2 because it is stored. That means, we would do an equivalence conversion from the scan node to a project above the scan.

Image RemovedImage Added

That means we We would add the projection expressions for all the VIRTUAL computed columns, the TableSource row type should exclude these VIRTUAL columns. Caution that the STORED column is as a normal column for table source.

The Write

If T2 is used as an insert target table(table sink), the T2 column b is totally ignored because it is virtual(not stored); The column d can only be an insert explicitly with DEFAULT value. No matter whether it is specified explicitly or not, the value would be derived from the expression that InitializerExpressionFactory pre-defines. For example, it may be derived with expression `c-1` within another projection right before we make insert .

Image Added

We would compute the non VIRTUAL column and insert it into the target table.

Image Removed

That means we would add the projection expressions for the computed columns with STORED column strategy, the TableSink row type should exclude those VIRTUAL columns.

These plan rewrite happens during the sql-to-rel conversion phrase.

Column Computation and Storage

...

We would put the TableColumn info of the InternalTableSchema into the CatalogTable properties which would be used to persist.

There may be a tool class named ColumnStrategyParser We use SqlParser to parse the string literals expression strings to ColumnStrategySqlNode.

Compatibility, Deprecation, and Migration Plan

This is a new feature and compatible with old version Flink. We may extend the TableSourceTable to support computed column interfaces, but this is transparent to user.

Implementation Plan

...

  1. Introduce the InitializerExpressionFactory to handle the initialization of the default value and generation of the computation expressions for generated columns.
  2. Make the FlinkRelOptTable extend the interface InitializerExpressionFactory because it is the abstraction of out Flink table for Calcite schema look up.
  3. Introduce the TableColumn structure in InternalTableSchemaTableSchema to describe the name/type/expression of the declared columns from the DDL. TableColumn should be kept serializable in order to be persisted within the BaseCatalogTable into the catalog.
  4. Introduce Extend the InternalTableSchema to describe our internal table schema(not visible to the table source/sink). The InternalTableSchema contains TableSchema to contain definitions of all the TableColumn.

Test Plan

The implementation can be tested with unit tests for every new feature. And we can add integration tests for connectors to verify it can cooperate with existing source implementations.

...