Discussion threadhttps://lists.apache.org/thread/wh818t7o10wr0yvqlvb50y5tvxcqmd4b
Vote threadhttps://lists.apache.org/thread/bntvprj5cyg7ozb9chjy3o55w3wnsrp9
JIRA

Unable to render Jira issues macro, execution error.

Release1.13

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Schema information is necessary at different locations in the Table API for defining tables and/or views. In particular, it is necessary to define the schema in a programmatic DDL (FLIP-129) and when converting a DataStream to a Table (FLIP-136).

We need similar APIs in the Catalog interfaces such that catalog implementations can define table/views in a unified way.

Furthermore, a catalog implementation needs a way to encode/decode a schema into properties.

Similar to classes such as Expression/ResolvedExpression or UnresolvedDataType/DataType, a schema should be provided in an unresolved and resolved variant.

Currently, TableSchema is a hybrid between resolved and unresolved schema which leads to methods such as

CatalogTableSchemaResolver.resolve(TableSchema): TableSchema

where the updated content of the schema is not directly visible.

This FLIP updates the class hierarchy to achieve the following goals:

  • make it visible whether a schema is resolved or unresolved and when the resolution happens
  • offer a unified API for FLIP-129, FLIP-136, and catalogs
  • allow arbitrary data types and expressions in the schema for watermark spec or columns
  • have access to other catalogs for declaring a data type or expression via CatalogManager
  • cleaned up TableSchema
  • remain backwards compatible in the persisted properties and API

Public Interfaces

  • org.apache.flink.table.api.Schema
  • org.apache.flink.table.catalog.ResolvedSchema
  • CatalogBaseTable#getUnresolvedSchema(): Schema
  • org.apache.flink.table.catalog.SchemaResolver
  • org.apache.flink.table.catalog.ResolvedCatalogTable
  • org.apache.flink.table.catalog.ResolvedCatalogTable#getResolvedSchema(): ResolvedSchema
  • org.apache.flink.table.catalog.ResolvedCatalogView
  • org.apache.flink.table.catalog.ResolvedCatalogView#getResolvedSchema(): ResolvedSchema
  • org.apache.flink.table.factories.DynamicTableFactory.Context#getCatalogTable(): ResolvedCatalogTable
  • org.apache.flink.table.api.Table#getResolvedSchema(): ResolvedSchema
  • org.apache.flink.table.operations.QueryOperation#getResolvedSchema(): ResolvedSchema
  • org.apache.flink.table.api.TableResult#getResolvedSchema(): ResolvedSchema

Proposed Changes

Schema

API class used by programmatic DDL, DataStream API conversion, and Catalog API.

Notes:

  • Stores Expression and AbstractDataType
  • Offers convenient methods to adopt columns etc. from other resolved or unresolved schema instances
  • It replaces TableSchema.Builder


Schema {
    
    static Schema.Builder newBuilder();

    List<UnresolvedColumn> getColumns();

    List<UnresolvedWatermarkSpec> getWatermarkSpecs();

    Optional<UnresolvedPrimaryKey> getPrimaryKey();

    ResolvedSchema resolve(SchemaResolver);

    Builder {
        Builder fromSchema(Schema);

        Builder fromResolvedSchema(ResolvedSchema);

        Builder column(String, AbstractDataType<?>);

        Builder columnByExpression(String, Expression);

        Builder columnByMetadata(String, AbstractDataType<?>);

        Builder columnByMetadata(String, AbstractDataType<?>, boolean);

        Builder columnByMetadata(String, AbstractDataType<?>, String);

        Builder columnByMetadata(String, AbstractDataType<?>, String, boolean);

        Builder watermark(String, Expression);

        Builder primaryKey(String... columnNames);

        Builder primaryKeyNamed(String, String...);

        Schema build();
    }
}

ResolvedSchema

Class that is exposed by the framework at different locations for validated and complete schema.

Notes:

  • Stores ResolvedExpression and DataType
  • It replaces TableSchema
  • Expressions that originated from SQL will have an implementation of ResolvedExpression#asSerializableString which makes it possible to serialize them into properties for the catalog
ResolvedSchema {

    int getColumnCount();

    List<Column> getColumns();

    Optional<Column> getColumn(int);

    Optional<Column> getColumn(String);

    List<WatermarkSpec> getWatermarkSpecs();

    Optional<UniqueConstraint> getPrimaryKey();

    DataType toRowDataType();

    DataType toPhysicalRowDataType();

    DataType toPersistedRowDataType();
}

SchemaResolver

References parser, data type factory, catalog manager, expression resolver, etc. for resolving SQL and Table API expressions and data types.

It works similar to CatalogTableSchemaResolver and will perform validation.

Notes:

  • Depending on the context the resolver is configured in streaming/batch mode
  • Depending on the context the resolver supports metadata columns (not supported for DataStream API conversion)
  • Instances are provided by the CatalogManager
SchemaResolver {

    ResolvedSchema resolve(Schema schema);

    boolean supportsMetadata();

    boolean isStreamingMode();
}

Updated Catalog Class Hierarchy

Due to the updated schema classes, we also need to give better semantics to the existing Catalog class hierarchy while maintaining backwards compatibility.

Some assumptions that come up during the offline discussions:

→ A CatalogView must have a schema.

  • This is defined by the SQL standard (see 4.15.7 Table descriptors).
  • Also Hive stores a view schema but the schema is not validated when executing queries on the view. And the stored schema will not be updated even the underlying table schema changed.
  • We also just consider the schema as metadata and don't validate it yet. However, platforms that use our catalog interfaces can do that already.

→ A CatalogTable and CatalogView are metadata objects. Thus, they are unresolved and return an unresolved schema.

→ Only validated and thus resolved CatalogTable's/CatalogView's should be put into a catalog.

We suggest the following interface additions:

CatalogBaseTable {

    Schema getUnresolvedSchema();
}

ResolvedCatalogTable implements CatalogTable {

    ResolvedSchema getResolvedSchema();
}

ResolvedCatalogView implements CatalogView {

    ResolvedSchema getResolvedSchema();
}

DynamicTableFactoryContext#getCatalogTable(): ResolvedCatalogTable

Notes:

  • The property design of CatalogTable#toProperties and CatalogTable#fromProperties will not change.
  • However, once we drop CatalogTable#getSchema():TableSchema we will only support ResolvedCatalogTable#toProperties anymore.
  • CatalogTable#fromProperties will always return an unresolved catalog table.
  • For backwards compatibility, we leave Catalog#createTable and Catalog#alterTable untouched. The catalog manager will call them with resolved table/view only but we will leave it up to the catalog implementation.

Updated Other Parts of the API

Because we deprecate TableSchema, we need to update other locations as well and provide a consistent experience for accessing a ResolvedSchema.

org.apache.flink.table.api.Table#getResolvedSchema(): ResolvedSchema
org.apache.flink.table.operations.QueryOperation#getResolvedSchema(): ResolvedSchema
org.apache.flink.table.api.TableResult#getResolvedSchema(): ResolvedSchema

Notes:

  • There might be other locations where TableSchema is used, we will gradually update those.
  • TableSchema is mostly used in legacy interfaces.
  • In newer interfaces for connectors, DataType and UniqueConstraint should be passed around. The entire schema is usually not required.

Compatibility, Deprecation, and Migration Plan

We aim full backwards compatibility in the next release.

We deprecate TableSchema and related interfaces.

Implementation Plan

  1. Implement Schema, ResolvedSchema, SchemaResolver and property conversion utils
  2. Update the Catalog APIs
  3. Update other parts of the API gradually

Rejected Alternatives

CatalogTable is fully resolved when returned by the Catalog

Schema
--> stores Expression, AbstractDataType

ResolvedSchema
--> stores ResolvedExpression, DataType

CatalogBaseTable.getResolvedSchema(): ResolvedSchema
--> stores ResolvedSchema

Catalog.getTable(ObjectPath, SchemaResolver): CatalogBaseTable
--> creates Schema and resolves it with the help of SchemaResolver

SchemaResolver.resolve(Schema): ResolvedSchema
--> references parser, catalog manager, etc. for resolving SQL and Table API expressions

CatalogTableImpl.fromProperties(Map<String, String> properties, SchemaResolver): CatalogTableImpl
--> construct Schema -> create ResolvedSchema -> verify against remaining properties

CatalogTableImpl.toProperties(): Map<String, String>
--> no change in properties yet

TableSchema extends Schema or ResolvedSchema

Since TableSchema is a hybrid of both, it would not be a smooth experience in any case.

It is saver and more user-friendly to build up a separate stack with better class hierarchy in correct package.


1) Reuse `table.api.TableColumn`, `table.api.WatermarkSpec`, `table.api.constraints.UniqueConstraint` but move them to `table.catalog`.

2) Introduce two Schema classes `table.api.Schema` for FLIP-129/FLIP-136 and `table.catalog.ResolvedSchema`.

3) Let `table.catalog.ResolvedSchema` implement most methods of `TableSchema` and resolve it with the help of a schema resolver.

4) Let `TableSchema` extend from `ResolvedSchema`.

5) Update API to properly split into `Schema` and `ResolvedSchema`. `Schema` will be returned by `CatalogBaseTable` such that expressions and data types can reference objects from other catalogs as well. `ResolvedSchema` will be returned by `QueryOperation`.

7) `TableColumn` and `WatermarkSpec` will be gradually reworked to remove the hybrid resolved/unresolved properties in members.

6) `DynamicTableFactory` will provide the resolved physical row data type and constraints as dedicated methods to avoid confusion which methods to call on `TableSchema.toRowDataType` or `TableSchema.toPhysicalRowDataType`.

We aim to let `table.api.TableSchema.Builder` untouched for backwards compatibility. However, implementations that use the rather new `add(TableColumn)` would need an update due to relocation and gradual refactorings.