Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Discussion thread
Vote thread
JIRA

Status

Current state: "Under discussion"

Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-107-Reading-table-columns-from-different-parts-of-source-records-td38277.html

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-15869

...

Release1.12


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

CREATE TABLE kinesis_table (
  id BIGINT,
  name STRING,
  partitionId STRING METADATA FROM 'partition' -- use different column name for metadata 'partition'
) WITH (
  'connector' = 'kinesis',
  'value.format' = 'avro'
)

SELECT * FROM kinesis_table;

-- Partition is a computedpersisted column, therefore it cannotcan be written to.:
INSERT Statements like below will fail:
INSERT INTO INTO kinesis_table VALUES (1, "ABC", "shard-0000")

...

INSERT INTO kafka_table VALUES (
  (1, "ABC", 1599133672, MAP('checksum', computeChecksum(...)))
)

Or with no persisted metadata:

...

Let's assume the following example:

CREATE TABLE t (i INT, s STRING, timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA, d DOUBLE)

The `timestamp` column is declared as a `METADATA` column. By default, the column name is used to map to a corresponding metadata key ("timestamp" in this case).

The data type of the column is used to perform an explicit cast of the original metadata data type. For example, a Kafka metadata timestamp is defined as `BIGINT`. A user can declare the column as `TIMESTAMP(3)` though and thus force an explicit cast from sources (BIGINT to TIMESTAMP) and to sinks (TIMESTAMP to BIGINT).

CREATE TABLE t (i INT, s STRING, myOffset INT METADATA FROM 'offset', d DOUBLE)

In order to use a different column name, it is possible to use `FROM` to reference the metadata key ("offset" in this case).

CREATE TABLE t (i INT, s STRING, offset INT METADATA VIRTUAL, d DOUBLE)

`offset INT METADATA` would be a valid read-only column for Kafka and can be extracted by the planner. However, since declared tables can be used for sources and sinks, we need to be able to exclude read-only metadata from writing.

The `VIRTUAL` keyword excludes the column from the query-to-sink schema which means that a query cannot write to a this metadata column from the query-to-sink schema which means that a query cannot write to a this metadata column..

Implications to other components

Because we are extending the DDL, this has implication to other components.

Schema

We propose to extend the `Schema` class from FLIP-129 by:

// for `offset INT METADATA`
SchemaBuilder.columnByMetadata(String, DataType)
// for `offset INT METADATA VIRTUAL`
SchemaBuilder.columnByMetadata(String, DataType, boolean isVirtual)
// for `myOffset INT METADATA FROM 'offset' VIRTUAL`
SchemaBuilder.columnByMetadata(String, DataType, String)
SchemaBuilder.columnByMetadata(String, DataType, String, boolean isVirtual)

An example would look like:

.schema(Schema.newBuilder()
.column("user_id", DataTypes.BIGINT())
.column("user_name", DataTypes.STRING())
.columnByMetadata("offset", DataTypes.INT())
.columnByMetadata("myOffset", DataTypes.INT(), "offset")
.watermark("ts", $("ts").minus(lit(3).seconds()))
.primaryKey("user_id")
.build()
)

LIKE clause

Currently, the LIKE clause offers the following table features:

  • CONSTRAINTS - constraints such as primary and unique keys
  • GENERATED - computed columns
  • OPTIONS - connector options that describe connector and format properties
  • PARTITIONS - partition of the tables
  • WATERMARKS - watermark declarations

We propose to extend the LIKE clause to add `METADATA` as another table feature.

Metadata columns are not generated. They represent data that is present in the external system. Thus, it should not be categorized into the GENERATED feature.

Furthermore, METADATA is connector dependent. It is safer to have a fine-grained control over the table feature. The user should control whether metadata can be inherited or not.

For example, this is important when switching from filesystem to Kafka or vice versa.

METADATA supports OVERWRITING by column name.

The following example shows how to overwrite a metadata column with another metadata column:

CREATE TABLE t (i INT, s STRING, timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp', d DOUBLE);

CREATE TABLE t (timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'other-system.timestamp')
WITH (...)
LIKE t (
INCLUDING ALL
OVERWRITING OPTIONS
OVERWRITING METADATA
)


Reading metadata via DynamicTableSource

Let's assume the following example:

CREATE TABLE t (i INT, s STRING, offset INT METADATA VIRTUAL, d DOUBLE)

We suggest the following interfaces for integrating reading metadata into FLIP-95 interfaces:

...

KeyData typeRead/WriteNotes
topic
STRINGrWe don't allow writing to different topics for now. Maybe we will allow that in the future via a property.
partition
INTrWe don't allow writing to different partitions for now. Maybe we will allow that in the future via a property.
headers
MAP<STRING, BYTES>r/w
leader-epoch
INTr
offset
BIGINTr
timestamp
BIGINT
TIMESTAMP(3) WITH LOCAL TIME ZONEr/wDirectly forward the underlying type.
timestamp-type
STRINGr['NoTimestampType', 'CreateTime', 'LogAppendTime']

...

KeyData typeRead/WriteNotes
debezium-json.schema
STRINGrPure JSON string, can be handled with Flink's built-in JSON SQL functions
debezium-json.ingestion-timestamp
BIGINT
TIMESTAMP(3) WITH LOCAL TIME ZONErDirectly forward the underlying type. Do not try to abstract it into a TIMESTAMP.
debezium-json.source.timestamp
BIGINT
TIMESTAMP(3) WITH LOCAL TIME ZONErShortcut for debezium-json.source[ts_ms].
debezium-json.source.database
STRINGrUnified shortcut for `debezium-json.source[db]` across SQL vendors.
debezium-json.source.schema
STRINGrUnified shortcut for `debezium-json.source[schema]` across SQL vendors.
debezium-json.source.table
STRINGrUnified shortcut for `debezium-json.source[table/collection]` across SQL vendors.
debezium-json.source.properties
MAP<STRING, STRING>rAll properties that Debezium provides under `source`. Frequently used properties have a a dedicated key above.
It seems that this is always a property list. So we can make it available as a map for easier access.

...

Declare everything via properties:

CREATE TABLE kafka_table (
id BIGINT,
name STRING,
col1 STRING,
col2 STRING,
ts TIMESTAMP(3) WITH LOCAL TIME ZONE, -- ts is a normal field, so can
be read and written.
offset AS SYSTEM_METADATA("offset")
) WITH (
'connector' = 'kafka',
'topic' = 'test-topic',
'key.fields' = 'id, name',
'key.format' = 'csv',
'value.format' = 'avro',
'timestamp.field' = 'ts' -- define the mapping of Kafka timestamp
);

Pros:

  • "timestamp", "headers" are something like "key" and "value" that are stored with the real data. So why not define the "timestamp" in the same way with "key" by using a "timestamp.field" connector option?

...

Option 2

Use computed columns:

CREATE TABLE kafka_table (
id BIGINT,
name STRING,
timestamp AS CAST(SYSTEM_METADATA("timestamp") AS INT) PERSISTED,
headers AS CAST(SYSTEM_METADATA("headers") AS MAP<STRING, BYTES>) PERSISTED
) WITH (
...

Pros:

  • Allows to have full flexibility to compute the final column and avoid helper columns:
    timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3)))

...