Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Feedback addressed

...

TableSchema.toPhysicalRowDataType => pure physical data, excluding computed columns and metadata columns
TableSchema.toRowDataType => full schema with all kinds of computed columns

Reading metadata via DecodingFormat

We allow formats to expose metadata as well. By default, a format does not provide any metadata. The information is not used unless a source supports it.

...

Similar to the source, `listReadableMetadata()`allows for validation and provides information about expected data types for the format's output row.

`applyReadableMetadata(...)` enables the modification of the format's produced row type to: PHYSICAL COLUMNS + FORMAT METADATA COLUMNS

The source must call the methods above and forward them if it implements `SupportsReadingMetadata`.

Both source and format can offer metadata columns. We define the following behavior for a consistent interaction:

  • The source must add metadata columns in the order as passed by the planner.
  • The passed `List<String> metadataKeys` will be ordered according to the iteration order of `listReadableMetadata()` (so it might be beneficial that implementations return a LinkedHashMap, but don't have to).
  • Format metadata keys have higher precedence than source metadata keys in case of duplicate key names (this case should happen very rarely).

In the end an output row that leaves the source looks like:

PHYSICAL COLUMNS + FORMAT METADATA COLUMNS + SOURCE METADATA COLUMNS

Writing metadata via DynamicTableSink

For being able to write out metadata from a query into a sink, we introduce the concept of persisted computed columns. The syntax and some (not all) semantics are borrowed from SQL Server:

"Specifies that the Database Engine will physically store the computed values in the table"

Let's assume the following example:

...

CREATE TABLE t (i INT, s STRING, t AS CAST(SYSTEM_METADATA("timestamp") AS BIGINT) PERSISTED, d DOUBLE)

Persisted computed columns must only consist of the pattern CAST + SYSTEM_METADATA. Such columns can be used for both symmetrically reading and writing metadata. More complex computation are not supported since we cannot perfectly map the expression to the metadata column otherwise.

Persisted computed columns are part of the query-to-sink schema which means that they need to be specified in an INSERT INTO statement. In order to make those columns optional, we suggest to introduce the syntax:

...

INSERT INTO t (i, s, d) SELECT ...

This marks persisted computed columns as NULL.

Persisted computed columns are part of a physical row that enters a sink (either passed by the query or NULL) it is never recomputed.

Note: The semantics are not 100% the same with SQL Server. We just borrow syntax and rough meaning. In order to not confuse users, we could also use a completely new keyword. E.g. `STORED` instead of `PERSISTED`.

We suggest the following interfaces for integrating writing metadata into FLIP-95 interfaces:

...

The interface is similar to `SupportsReadingMetadata`. `listWritableMetadata()` is used for validation and metadata information.

The `applyWritableMetadata(...)` provides the final list of keys that is used in the DDL. The method also enables writing metadata in formats. 

By definition, the planner will put persisted columns to the end of the physical row before passing it to a sink this makes it easier to split metadata and actual data. The sink can use `TableSchema.toPhysicalRowDataType` to extract the pure data as before.

For the example above, a row that enters a sink looks like:

...

ROW<i INT, s STRING, d DOUBLE, t BIGINT> 

Handling of Data Types

For making the use of SYSTEM_METADATA easier and avoid nested casting such as:

rowtime AS CAST(CAST(SYSTEM_METADATA("timestamp") AS BIGINT) AS
TIMESTAMP(3) WITH LOCAL TIME ZONE)

We allow explicit casting to a target data type:

rowtime AS CAST(SYSTEM_METADATA("timestamp") AS TIMESTAMP(3) WITH LOCAL TIME ZONE)

A connector still produces and consumes the data type returned by `listMetadata()`. The planner will insert necessary explicit casts.

In any case, the user must provide a CAST such that the computed column receives a valid data type when constructing the table schema.

Reading metadata via DecodingFormat

We allow formats to expose metadata as well. By default, a format does not provide any metadata. The information is not used unless a source supports it.

interface DecodingFormat {

default Map<String, DataType> listReadableMetadata() {
return Collections.emptyMap();
}

default void applyReadableMetadata(List<String> metadataKeys) {
throw new UnsupportedOperationException();
}
}

Similar to the source, `listReadableMetadata()`allows for validation and provides information about expected data types for the format's output row.

`applyReadableMetadata(...)` enables the modification of the format's produced row type to: PHYSICAL COLUMNS + FORMAT METADATA COLUMNS

The source must call the methods above and forward them if it implements `SupportsReadingMetadata`.

Both source and format can offer metadata columns. We define the following behavior for a consistent interaction:

  • The source must add metadata columns in the order as passed by the planner.
  • The passed `List<String> metadataKeys` will be ordered according to the iteration order of `listReadableMetadata()` (so it might be beneficial that implementations return a LinkedHashMap, but don't have to).
  • Format metadata keys have higher precedence than source metadata keys in case of duplicate key names (this case should happen very rarely).

In the end an output row that leaves the source looks like:

PHYSICAL COLUMNS + FORMAT METADATA COLUMNS + SOURCE METADATA COLUMNS

Writing metadata via DynamicTableSink

For being able to write out metadata from a query into a sink, we introduce the concept of persisted computed columns. The syntax and some (not all) semantics are borrowed from SQL Server:

"Specifies that the Database Engine will physically store the computed values in the table"

Let's assume the following example:

CREATE TABLE t (i INT, s STRING, t AS CAST(SYSTEM_METADATA("timestamp") AS BIGINT) PERSISTED, d DOUBLE)

Persisted computed columns must only consist of the pattern CAST + SYSTEM_METADATA. Such columns can be used for both symmetrically reading and writing metadata. More complex computation are not supported since we cannot perfectly map the expression to the metadata column otherwise.

Persisted computed columns are part of the query-to-sink schema which means that they need to be specified in an INSERT INTO statement. In order to make those columns optional, we suggest to introduce the syntax:

INSERT INTO t (i, s, d) SELECT ...

This marks persisted computed columns as NULL.

Persisted computed columns are part of a physical row that enters a sink (either passed by the query or NULL) it is never recomputed.

Note: The semantics are not 100% the same with SQL Server. We just borrow syntax and rough meaning. In order to not confuse users, we could also use a completely new keyword. E.g. `STORED` instead of `PERSISTED`.

We suggest the following interfaces for integrating writing metadata into FLIP-95 interfaces:

interface SupportsWritingMetadata {

Map<String, DataType> listWritableMetadata();

void applyWritableMetadata(List<String> metadataKeys);
}

The interface is similar to `SupportsReadingMetadata`. `listWritableMetadata()` is used for validation and metadata information.

The `applyWritableMetadata(...)` provides the final list of keys that is used in the DDL. The method also enables writing metadata in formats. 

By definition, the planner will put persisted columns to the end of the physical row before passing it to a sink this makes it easier to split metadata and actual data. The sink can use `TableSchema.toPhysicalRowDataType` to extract the pure data as before.

For the example above, a row that enters a sink looks like:

ROW<i INT, s STRING, d DOUBLE, t BIGINT> 

Writing metadata via EncodingFormat

Similar to DecodingFormat, we add similar default methods to EncodingFormat. That can be used by the DynamicTableSink.

interface EncodingFormat {

default Map<String, DataType> listWritableMetadata() {
return Collections.emptyMap();
}

default void applyWritableMetadata(List<String> metadataKeys) {
throw new UnsupportedOperationException();
}
}

By calling `applyWritableMetadata(...)`, the format enables the modification of the format's consumed row type. The row that goes into the format during runtime will be:  PHYSICAL COLUMNS + FORMAT METADATA COLUMNS

Same rules as mentioned above for handling both sink metadata and format metadata apply.

In the end an input row into the sink looks like:

PHYSICAL COLUMNS + FORMAT METADATA COLUMNS + SOURCE METADATA COLUMNS

Metadata for existing connectors and formats

For completeness, we list the first metadata for existing Flink connectors and formats. The naming follows FLIP-122 which means:

  • connector metadata has no prefix
  • formats will have a prefix using the factory identifier
  • key/value formats are prefixed with `key.` and `value.`

Kafka

KeyData typeRead/WriteNotes
topic
STRINGrWe don't allow writing to different topics for now. Maybe we will allow that in the future via a property.
partition
INTrWe don't allow writing to different partitions for now. Maybe we will allow that in the future via a property.
headers
MAP<STRING, BYTES>r/w
leader-epoch
INTr
offset
BIGINTr
timestamp
BIGINTr/wDirectly forward the underlying type.
timestamp-type
STRINGr['NoTimestampType', 'CreateTime', 'LogAppendTime']

Debezium

Writing metadata via EncodingFormat

Similar to DecodingFormat, we add similar default methods to EncodingFormat. That can be used by the DynamicTableSink.

...

By calling `applyWritableMetadata(...)`, the format enables the modification of the format's consumed row type. The row that goes into the format during runtime will be:  PHYSICAL COLUMNS + FORMAT METADATA COLUMNS

Same rules as mentioned above for handling both sink metadata and format metadata apply.

In the end an input row into the sink looks like:

PHYSICAL COLUMNS + FORMAT METADATA COLUMNS + SOURCE METADATA COLUMNS

Metadata for existing connectors and formats

For completeness, we list the first metadata for existing Flink connectors and formats. The naming follows FLIP-122 which means:

  • connector metadata has no prefix
  • formats will have a prefix using the factory identifier
  • key/value formats are prefixed with `key.` and `value.`

...

KeyData typeRead/WriteNotes
topic
STRINGrWe don't allow writing to different topics for now. Maybe we will allow that in the future via a property.
partition
INTrWe don't allow writing to different partitions for now. Maybe we will allow that in the future via a property.
headers
MAP<STRING, BYTES>r/w
leader-epoch
INTr
offset
BIGINTr
timestamp
BIGINTr/wDirectly forward the underlying type. Do not try to abstract it into a TIMESTAMP.
timestamp-type
STRINGr['NoTimestampType', 'CreateTime', 'LogAppendTime']

Debezium

debezium-json.schema
STRINGrPure JSON string, can be handled with Flink's built-in JSON SQL functions
debezium-json.ingestion-timestamp
BIGINTrDirectly forward the underlying type. Do not try to abstract it into a TIMESTAMP.
debezium-json.source.timestamp
BIGINTrShortcut for debezium-json.source[ts_ms].
debezium-json.source.database
STRINGrUnified shortcut for `debezium-json.source[db]` across SQL vendors.
debezium-json.source.schema
STRINGrUnified shortcut for `debezium-json.source[schema]` across SQL vendors.
debezium-json.source.table
STRINGrUnified shortcut for `debezium-json.source[table/collection]` across SQL vendors
KeyData typeRead/WriteNotes
debezium-json.schema
STRINGrPure JSON string, can be handled with Flink's built-in JSON SQL functions
debezium-json.timestamp
BIGINTrDirectly forward the underlying type. Do not try to abstract it into a TIMESTAMP.
debezium-json.source.properties
MAP<STRING, STRING>rAll properties that Debezium provides under `source`. Frequently used properties have a a dedicated key above.
It seems that this is always a property list. So we can make it available as a map for easier access.

...