Status
Current state: "Accepted"
Discussion thread: https://lists.apache.org/thread.html/r79064a448d38d5b3d091dfb4703ae3eb94d5c6dd3970f120be7d5b13%40%3Cdev.flink.apache.org%3E
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: 1.13
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Schema information is necessary at different locations in the Table API for defining tables and/or views. In particular, it is necessary to define the schema in a programmatic DDL (FLIP-129) and when converting a DataStream to a Table (FLIP-136).
We need similar APIs in the Catalog interfaces such that catalog implementations can define table/views in a unified way.
Furthermore, a catalog implementation needs a way to encode/decode a schema into properties.
Similar to classes such as Expression/ResolvedExpression or UnresolvedDataType/DataType, a schema should be provided in an unresolved and resolved variant.
Currently, TableSchema is a hybrid between resolved and unresolved schema which leads to methods such as
CatalogTableSchemaResolver.resolve(TableSchema): TableSchema
where the updated content of the schema is not directly visible.
This FLIP updates the class hierarchy to achieve the following goals:
- make it visible whether a schema is resolved or unresolved and when the resolution happens
- offer a unified API for FLIP-129, FLIP-136, and catalogs
- allow arbitrary data types and expressions in the schema for watermark spec or columns
- have access to other catalogs for declaring a data type or expression via CatalogManager
- cleaned up TableSchema
- remain backwards compatible in the persisted properties and API
Public Interfaces
org.apache.flink.table.api.Schema
org.apache.flink.table.catalog.ResolvedSchema
CatalogBaseTable#getUnresolvedSchema(): Schema
org.apache.flink.table.catalog.SchemaResolver
org.apache.flink.table.catalog.ResolvedCatalogTable
org.apache.flink.table.catalog.ResolvedCatalogTable#getResolvedSchema(): ResolvedSchema
org.apache.flink.table.catalog.ResolvedCatalogView
org.apache.flink.table.catalog.ResolvedCatalogView#getResolvedSchema(): ResolvedSchema
org.apache.flink.table.factories.DynamicTableFactory.Context#getCatalogTable(): ResolvedCatalogTable
org.apache.flink.table.api.Table#getResolvedSchema(): ResolvedSchema
org.apache.flink.table.operations.QueryOperation#getResolvedSchema(): ResolvedSchema
org.apache.flink.table.api.TableResult#getResolvedSchema(): ResolvedSchema
Proposed Changes
Schema
API class used by programmatic DDL, DataStream API conversion, and Catalog API.
Notes:
- Stores
Expression
andAbstractDataType
- Offers convenient methods to adopt columns etc. from other resolved or unresolved schema instances
- It replaces
TableSchema.Builder
Schema {
static Schema.Builder newBuilder();
List<UnresolvedColumn> getColumns();
List<UnresolvedWatermarkSpec> getWatermarkSpecs();
Optional<UnresolvedPrimaryKey> getPrimaryKey();
ResolvedSchema resolve(SchemaResolver);
Builder {
Builder fromSchema(Schema);
Builder fromResolvedSchema(ResolvedSchema);
Builder column(String, AbstractDataType<?>);
Builder columnByExpression(String, Expression);
Builder columnByMetadata(String, AbstractDataType<?>);
Builder columnByMetadata(String, AbstractDataType<?>, boolean);
Builder columnByMetadata(String, AbstractDataType<?>, String);
Builder columnByMetadata(String, AbstractDataType<?>, String, boolean);
Builder watermark(String, Expression);
Builder primaryKey(String... columnNames);
Builder primaryKeyNamed(String, String...);
Schema build();
}
}
ResolvedSchema
Class that is exposed by the framework at different locations for validated and complete schema.
Notes:
- Stores
ResolvedExpression
andDataType
- It replaces
TableSchema
- Expressions that originated from SQL will have an implementation of
ResolvedExpression#asSerializableString
which makes it possible to serialize them into properties for the catalog
ResolvedSchema {
int getColumnCount();
List<Column> getColumns();
Optional<Column> getColumn(int);
Optional<Column> getColumn(String);
List<WatermarkSpec> getWatermarkSpecs();
Optional<UniqueConstraint> getPrimaryKey();
DataType toRowDataType();
DataType toPhysicalRowDataType();
DataType toPersistedRowDataType();
}
SchemaResolver
References parser, data type factory, catalog manager, expression resolver, etc. for resolving SQL and Table API expressions and data types.
It works similar to CatalogTableSchemaResolver
and will perform validation.
Notes:
- Depending on the context the resolver is configured in streaming/batch mode
- Depending on the context the resolver supports metadata columns (not supported for DataStream API conversion)
- Instances are provided by the
CatalogManager
SchemaResolver {
ResolvedSchema resolve(Schema schema);
boolean supportsMetadata();
boolean isStreamingMode();
}
Updated Catalog Class Hierarchy
Due to the updated schema classes, we also need to give better semantics to the existing Catalog class hierarchy while maintaining backwards compatibility.
Some assumptions that come up during the offline discussions:
→ A CatalogView must have a schema.
- This is defined by the SQL standard (see 4.15.7 Table descriptors).
- Also Hive stores a view schema but the schema is not validated when executing queries on the view. And the stored schema will not be updated even the underlying table schema changed.
- We also just consider the schema as metadata and don't validate it yet. However, platforms that use our catalog interfaces can do that already.
→ A CatalogTable and CatalogView are metadata objects. Thus, they are unresolved and return an unresolved schema.
→ Only validated and thus resolved CatalogTable's/CatalogView's should be put into a catalog.
We suggest the following interface additions:
CatalogBaseTable {
Schema getUnresolvedSchema();
}
ResolvedCatalogTable implements CatalogTable {
ResolvedSchema getResolvedSchema();
}
ResolvedCatalogView implements CatalogView {
ResolvedSchema getResolvedSchema();
}
DynamicTableFactoryContext#getCatalogTable(): ResolvedCatalogTable
Notes:
- The property design of
CatalogTable#toProperties
andCatalogTable#fromProperties
will not change. - However, once we drop
CatalogTable#getSchema():TableSchema
we will only supportResolvedCatalogTable#toProperties
anymore. CatalogTable#fromProperties
will always return an unresolved catalog table.- For backwards compatibility, we leave
Catalog#createTable
andCatalog#alterTable
untouched. The catalog manager will call them with resolved table/view only but we will leave it up to the catalog implementation.
Updated Other Parts of the API
Because we deprecate TableSchema, we need to update other locations as well and provide a consistent experience for accessing a ResolvedSchema.
org.apache.flink.table.api.Table#getResolvedSchema(): ResolvedSchema
org.apache.flink.table.operations.QueryOperation#getResolvedSchema(): ResolvedSchema
org.apache.flink.table.api.TableResult#getResolvedSchema(): ResolvedSchema
Notes:
- There might be other locations where TableSchema is used, we will gradually update those.
- TableSchema is mostly used in legacy interfaces.
- In newer interfaces for connectors,
DataType
andUniqueConstraint
should be passed around. The entire schema is usually not required.
Compatibility, Deprecation, and Migration Plan
We aim full backwards compatibility in the next release.
We deprecate TableSchema
and related interfaces.
Implementation Plan
- Implement Schema, ResolvedSchema, SchemaResolver and property conversion utils
- Update the Catalog APIs
- Update other parts of the API gradually
Rejected Alternatives
CatalogTable is fully resolved when returned by the Catalog
Schema
--> stores Expression, AbstractDataType
ResolvedSchema
--> stores ResolvedExpression, DataType
CatalogBaseTable.getResolvedSchema(): ResolvedSchema
--> stores ResolvedSchema
Catalog.getTable(ObjectPath, SchemaResolver): CatalogBaseTable
--> creates Schema and resolves it with the help of SchemaResolver
SchemaResolver.resolve(Schema): ResolvedSchema
--> references parser, catalog manager, etc. for resolving SQL and Table API expressions
CatalogTableImpl.fromProperties(Map<String, String> properties, SchemaResolver): CatalogTableImpl
--> construct Schema -> create ResolvedSchema -> verify against remaining properties
CatalogTableImpl.toProperties(): Map<String, String>
--> no change in properties yet
TableSchema extends Schema or ResolvedSchema
Since TableSchema is a hybrid of both, it would not be a smooth experience in any case.
It is saver and more user-friendly to build up a separate stack with better class hierarchy in correct package.
1) Reuse `table.api.TableColumn`, `table.api.WatermarkSpec`, `table.api.constraints.UniqueConstraint` but move them to `table.catalog`.
2) Introduce two Schema classes `table.api.Schema` for FLIP-129/FLIP-136 and `table.catalog.ResolvedSchema`.
3) Let `table.catalog.ResolvedSchema` implement most methods of `TableSchema` and resolve it with the help of a schema resolver.
4) Let `TableSchema` extend from `ResolvedSchema`.
5) Update API to properly split into `Schema` and `ResolvedSchema`. `Schema` will be returned by `CatalogBaseTable` such that expressions and data types can reference objects from other catalogs as well. `ResolvedSchema` will be returned by `QueryOperation`.
7) `TableColumn` and `WatermarkSpec` will be gradually reworked to remove the hybrid resolved/unresolved properties in members.
6) `DynamicTableFactory` will provide the resolved physical row data type and constraints as dedicated methods to avoid confusion which methods to call on `TableSchema.toRowDataType` or `TableSchema.toPhysicalRowDataType`.
We aim to let `table.api.TableSchema.Builder` untouched for backwards compatibility. However, implementations that use the rather new `add(TableColumn)` would need an update due to relocation and gradual refactorings.