Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Discussion thread
Vote thread
JIRA

Status

Current state: "Under discussion"

Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-107-Reading-table-columns-from-different-parts-of-source-records-td38277.html

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-15869

...

Release1.12


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The `VIRTUAL` keyword excludes the column from the query-to-sink schema which means that a query cannot write to a this metadata column.

Reading metadata via DynamicTableSource

Implications to other components

Because we are extending the DDL, this has implication to other components.

Schema

We propose to extend the `Schema` class from FLIP-129 by:

// for `offset INT METADATA`
SchemaBuilder.columnByMetadata(String, DataType)
// for `offset INT METADATA VIRTUAL`
SchemaBuilder.columnByMetadata(String, DataType, boolean isVirtual)
// for `myOffset INT METADATA FROM 'offset' VIRTUAL`
SchemaBuilder.columnByMetadata(String, DataType, String)
SchemaBuilder.columnByMetadata(String, DataType, String, boolean isVirtual)

An example would look like:

.schema(Schema.newBuilder()
.column("user_id", DataTypes.BIGINT())
.column("user_name", DataTypes.STRING())
.columnByMetadata("offset", DataTypes.INT())
.columnByMetadata("myOffset", DataTypes.INT(), "offset")
.watermark("ts", $("ts").minus(lit(3).seconds()))
.primaryKey("user_id")
.build()
)

LIKE clause

Currently, the LIKE clause offers the following table features:

  • CONSTRAINTS - constraints such as primary and unique keys
  • GENERATED - computed columns
  • OPTIONS - connector options that describe connector and format properties
  • PARTITIONS - partition of the tables
  • WATERMARKS - watermark declarations

We propose to extend the LIKE clause to add `METADATA` as another table feature.

Metadata columns are not generated. They represent data that is present in the external system. Thus, it should not be categorized into the GENERATED feature.

Furthermore, METADATA is connector dependent. It is safer to have a fine-grained control over the table feature. The user should control whether metadata can be inherited or not.

For example, this is important when switching from filesystem to Kafka or vice versa.

METADATA supports OVERWRITING by column name.

The following example shows how to overwrite a metadata column with another metadata column:

CREATE TABLE t (i INT, s STRING, timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp', d DOUBLE);

CREATE TABLE t (timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'other-system.timestamp')
WITH (...)
LIKE t (
INCLUDING ALL
OVERWRITING OPTIONS
OVERWRITING METADATA
)


Reading metadata via DynamicTableSource

LetLet's assume the following example:

...

KeyData typeRead/WriteNotes
topic
STRINGrWe don't allow writing to different topics for now. Maybe we will allow that in the future via a property.
partition
INTrWe don't allow writing to different partitions for now. Maybe we will allow that in the future via a property.
headers
MAP<STRING, BYTES>r/w
leader-epoch
INTr
offset
BIGINTr
timestamp
BIGINT
TIMESTAMP(3) WITH LOCAL TIME ZONEr/wDirectly forward the underlying type.
timestamp-type
STRINGr['NoTimestampType', 'CreateTime', 'LogAppendTime']

...

KeyData typeRead/WriteNotes
debezium-json.schema
STRINGrPure JSON string, can be handled with Flink's built-in JSON SQL functions
debezium-json.ingestion-timestamp
BIGINT
TIMESTAMP(3) WITH LOCAL TIME ZONErDirectly forward the underlying type. Do not try to abstract it into a TIMESTAMP.
debezium-json.source.timestamp
BIGINT
TIMESTAMP(3) WITH LOCAL TIME ZONErShortcut for debezium-json.source[ts_ms].
debezium-json.source.database
STRINGrUnified shortcut for `debezium-json.source[db]` across SQL vendors.
debezium-json.source.schema
STRINGrUnified shortcut for `debezium-json.source[schema]` across SQL vendors.
debezium-json.source.table
STRINGrUnified shortcut for `debezium-json.source[table/collection]` across SQL vendors.
debezium-json.source.properties
MAP<STRING, STRING>rAll properties that Debezium provides under `source`. Frequently used properties have a a dedicated key above.
It seems that this is always a property list. So we can make it available as a map for easier access.

...