Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

INSERT INTO kafka_table VALUES (
  (1, "ABC", 1599133672, MAP('checksum', computeChecksum(...)))
)

Or with no persisted metadata:

...

Let's assume the following example:

CREATE TABLE t (i INT, s STRING, timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA, d DOUBLE)

The `timestamp` column is declared as a `METADATA` column. By default, the column name is used to map to a corresponding metadata key ("timestamp" in this case).

The data type of the column is used to perform an explicit cast of the original metadata data type. For example, a Kafka metadata timestamp is defined as `BIGINT`. A user can declare the column as `TIMESTAMP(3)` though and thus force an explicit cast from sources (BIGINT to TIMESTAMP) and to sinks (TIMESTAMP to BIGINT).

CREATE TABLE t (i INT, s STRING, myOffset INT METADATA FROM 'offset', d DOUBLE)

In order to use a different column name, it is possible to use `FROM` to reference the metadata key ("offset" in this case).

CREATE TABLE t (i INT, s STRING, offset INT METADATA VIRTUAL, d DOUBLE)

`offset INT METADATA` would be a valid read-only column for Kafka and can be extracted by the planner. However, since declared tables can be used for sources and sinks, we need to be able to exclude read-only metadata from writing.

...

Let's assume the following example:

CREATE TABLE t (i INT, s STRING, offset INT METADATA VIRTUAL, d DOUBLE)

We suggest the following interfaces for integrating reading metadata into FLIP-95 interfaces:

...

Declare everything via properties:

CREATE TABLE kafka_table (
id BIGINT,
name STRING,
col1 STRING,
col2 STRING,
ts TIMESTAMP(3) WITH LOCAL TIME ZONE, -- ts is a normal field, so can
be read and written.
offset AS SYSTEM_METADATA("offset")
) WITH (
'connector' = 'kafka',
'topic' = 'test-topic',
'key.fields' = 'id, name',
'key.format' = 'csv',
'value.format' = 'avro',
'timestamp.field' = 'ts' -- define the mapping of Kafka timestamp
);

Pros:

  • "timestamp", "headers" are something like "key" and "value" that are stored with the real data. So why not define the "timestamp" in the same way with "key" by using a "timestamp.field" connector option?

...

Option 2

Use computed columns:

CREATE TABLE kafka_table (
id BIGINT,
name STRING,
timestamp AS CAST(SYSTEM_METADATA("timestamp") AS INT) PERSISTED,
headers AS CAST(SYSTEM_METADATA("headers") AS MAP<STRING, BYTES>) PERSISTED
) WITH (
...

Pros:

  • Allows to have full flexibility to compute the final column and avoid helper columns:
    timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3)))

...