Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Discussion thread
Vote thread
JIRA

Status

Current state: "Under discussion"

Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-107-Reading-table-columns-from-different-parts-of-source-records-td38277.html

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-15869

...

Release1.12


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  offset ASINT CAST(SYSTEM_METADATA("offset") AS INT),METADATA, -- access Kafka 'offset' metadata
  headers MAP<STRING, ASBYTES> CAST(SYSTEM_METADATA("headers") AS MAP<STRING, BYTES>)METADATA -- access Kafka 'headers' metadata
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic', 

  'format' = 'avro'
)

...

CREATE TABLE kinesis_table (
  id BIGINT,
  name STRING,
  partitionId ASSTRING CAST(SYSTEM_METADATA("partition") AS STRING)METADATA FROM 'partition' -- use different column name for metadata 'partition'
) WITH (
  'connector' = 'kinesis',
  'value.format' = 'avro'
)

SELECT * FROM kinesis_table;

-- Partition is a computedpersisted column, therefore it cannotcan be written to. Statements like below will fail:
INSERT INTO kinesis_table VALUES (1, "ABC", "shard-0000")

...

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  offset ASINT CAST(SYSTEM_METADATA("offset") AS INT), -- from Kafka
  database ASSTRING CAST(SYSTEM_METADATA("database") AS STRING)METADATA FROM 'value.canal-json.database' -- from Canal
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic', 

  'format' = 'canal-json'
)

...

{
    "data": [
        {
            "id": "102",
            "name": "car battery"
        }
    ],
    "database": "inventory”,
    "table": "products",
    "es": 1589374013000,
    "ts": 1589374013680,
    "type": "DELETE"
}

Kafka:

...

Write metadata

...

into Kafka's

...

ProducerRecord

...

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  timestamp BIGINT METADATA,
  offset ASINT CAST(SYSTEM_METADATA("offset") AS INT) + 1000METADATA VIRTUAL -- offset cannot be written out, it is ignored in the query-sink schema
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic', 

  'format' = 'avro'
)

Kafka: Write metadata into Kafka's ProducerRecord

An insert statement could look like:

INSERT INTOCREATE TABLE kafka_table VALUES (
  id BIGINT,
  name STRING,
  timestamp AS CAST(SYSTEM_METADATA("timestamp") AS BIGINT) PERSISTED,
  headers AS CAST(SYSTEM_METADATA("headers") AS MAP<STRING, BYTES>) PERSISTED
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic', 
  'format' = 'avro'
)

An insert statement could look like:

...

INSERT INTO kafka_table VALUES (
  (1, "ABC", 1599133672, MAP('checksum', computeChecksum(...)))
)
(1, "ABC", 1599133672)
)

Or with no persisted metadata:

INSERT INTO kafka_table (id, name) VALUES (
  (1, "ABC")
)

Implementation Details

Syntax and Semantics

Let's assume the following example:

CREATE TABLE t (i INT, s STRING, timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA, d DOUBLE)

The `timestamp` column is declared as a `METADATA` column. By default, the column name is used to map to a corresponding metadata key ("timestamp" in this case).

The data type of the column is used to perform an explicit cast of the original metadata data type. For example, a Kafka metadata timestamp is defined as `BIGINT`. A user can declare the column as `TIMESTAMP(3)` though and thus force an explicit cast from sources (BIGINT to TIMESTAMP) and to sinks (TIMESTAMP to BIGINT).

Or with no persisted metadata:

...

INSERT INTO kafka_table (id, name) VALUES (
  (1, "ABC")
)

Implementation Details

Reading metadata via DynamicTableSource

...

CREATE TABLE t (i INT, s STRING, omyOffset ASINT CAST(SYSTEM_METADATA("offset") AS INT)FROM 'offset', d DOUBLE)

`CAST(SYSTEM_METADATA("offset") AS INT)` would be a valid read-only computed column for Kafka and can be extracted by the planner.

...

In order to use a different column name, it is possible to use `FROM` to reference the metadata key ("offset"

...

Note: Alternatively, we could allow the syntax `SYSTEM_METADATA("offset", INT)` for avoiding the CAST, however, this would require changes in the parser (and maybe validator). Because types cannot be used as an argument easily.

We suggest the following interfaces for integrating reading metadata into FLIP-95 interfaces:

...

in this case).

CREATE TABLE t (i INT, s STRING, offset INT METADATA VIRTUAL, d DOUBLE)

`offset INT METADATA` would be a valid read-only column for Kafka and can be extracted by the planner. However, since declared tables can be used for sources and sinks, we need to be able to exclude read-only metadata from writing.

The `VIRTUAL` keyword excludes the column from the query-to-sink schema which means that a query cannot write to a this metadata column.

Implications to other components

Because we are extending the DDL, this has implication to other components.

Schema

We propose to extend the `Schema` class from FLIP-129 by:

// for `offset INT METADATA`
SchemaBuilder.columnByMetadata(String, DataType)
// for `offset INT METADATA VIRTUAL`
SchemaBuilder.columnByMetadata(String, DataType, boolean isVirtual)
// for `myOffset INT METADATA FROM 'offset' VIRTUAL`
SchemaBuilder.columnByMetadata(String, DataType, String)
SchemaBuilder.columnByMetadata(String, DataType, String, boolean isVirtual)

An example would look like:

.schema(Schema.newBuilder()
.column("user_id", DataTypes.BIGINT())
.column("user_name", DataTypes.STRING())
.columnByMetadata("offset", DataTypes.INT())
.columnByMetadata("myOffset", DataTypes.INT(), "offset")
.watermark("ts", $("ts").minus(lit(3).seconds()))
.primaryKey("user_id")
.build()
)

LIKE clause

Currently, the LIKE clause offers the following table features:

  • CONSTRAINTS - constraints such as primary and unique keys
  • GENERATED - computed columns
  • OPTIONS - connector options that describe connector and format properties
  • PARTITIONS - partition of the tables
  • WATERMARKS - watermark declarations

We propose to extend the LIKE clause to add `METADATA` as another table feature.

Metadata columns are not generated. They represent data that is present in the external system. Thus, it should not be categorized into the GENERATED feature.

Furthermore, METADATA is connector dependent. It is safer to have a fine-grained control over the table feature. The user should control whether metadata can be inherited or not.

For example, this is important when switching from filesystem to Kafka or vice versa.

METADATA supports OVERWRITING by column name.

The following example shows how to overwrite a metadata column with another metadata column:

CREATE TABLE t (i INT, s STRING, timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp', d DOUBLE);

CREATE TABLE t (timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'other-system.timestamp')
WITH (...)
LIKE t (
INCLUDING ALL
OVERWRITING OPTIONS
OVERWRITING METADATA
)


Reading metadata via DynamicTableSource

Let's assume the following example:

CREATE TABLE t (i INT, s STRING, offset INT METADATA VIRTUAL, d DOUBLE)

We suggest the following interfaces for integrating reading metadata into FLIP-95 interfaces:

interface SupportsReadingMetadata {

Map<String, DataType> listReadableMetadata();

void applyReadableMetadata(List<String> metadataKeys, DataType outputDataType);
}

`listReadableMetadata()` allows for validation and provides information about expected data types for the output row.

`applyReadableMetadata(List<String> metadataKeys, DataType outputDataType)` provides the list of required metadata by the DDL and query. It incorporates information from projection push down and only requests metadata that is necessary for executing the given query.

A DynamicTableSource must append the requested metadata columns (such as `offset INT` in the example) to the source's original output row in the order of the given list. This also works for a row that is passed into SupportsComputedColumnPushDown.

In general, the method `applyReadableMetadata(...)` is very similar to `SupportsComputedColumnPushDown#applyComputedColumn`. The method provides an updated output data type to create TypeInformation similar to SupportsComputedColumnPushDown (i.e. projection pushdown is considered and applied before). This is the output type that also the planner expects.

The order of execution in a source is:

projection push down → metadata appending → computed column execution (in source or follow-up operator)

All other row data types do not change. This means:

TableSchema.toPhysicalRowDataType => pure physical data, excluding computed columns and metadata columns
TableSchema.toRowDataType => full schema with all kinds of computed columns

Handling of Data Types

For making the use of metadata easier and avoid nested casting such as:

rowtime BIGINT METADATA FROM 'timestamp'
rowtimeCasted AS CAST(rowtime AS

`listReadableMetadata()` allows for validation and provides information about expected data types for the output row.

`applyReadableMetadata(List<String> metadataKeys, DataType outputDataType)` provides the list of required metadata by the DDL and query. It incorporates information from projection push down and only requests metadata that is necessary for executing the given query.

A DynamicTableSource must append the requested metadata columns (such as `offset INT` in the example) to the source's original output row in the order of the given list. This also works for a row that is passed into SupportsComputedColumnPushDown.

In general, the method `applyReadableMetadata(...)` is very similar to `SupportsComputedColumnPushDown#applyComputedColumn`. The method provides an updated output data type to create TypeInformation similar to SupportsComputedColumnPushDown (i.e. projection pushdown is considered and applied before). This is the output type that also the planner expects.

The order of execution in a source is:

projection push down → metadata appending → computed column execution (in source or follow-up operator)

All other row data types do not change. This means:

...

TableSchema.toPhysicalRowDataType => pure physical data, excluding computed columns and metadata columns
TableSchema.toRowDataType => full schema with all kinds of computed columns

Handling of Data Types

For making the use of SYSTEM_METADATA easier and avoid nested casting such as:

rowtime AS CAST(CAST(SYSTEM_METADATA("timestamp") AS BIGINT) AS
TIMESTAMP(3) WITH LOCAL TIME ZONE)

We allow explicit casting to a target data type:

rowtime AS CAST(SYSTEM_METADATA("timestamp") AS
rowtime TIMESTAMP(3) WITH LOCAL TIME ZONE
)
 METADATA FROM 'timestamp'

A connector still produces and consumes the data type returned by `listMetadata()`. The planner will insert necessary explicit casts.

In any case, the user must provide a CAST data type such that the computed metadata column receives a valid data type when constructing the table schema.

...

`applyReadableMetadata(...)` enables the modification of the format's produced row type to: PHYSICAL

PHYSICAL COLUMNS + FORMAT METADATA COLUMNS

The source must call the methods above and forward them if it implements `SupportsReadingMetadata`.

...

In the end an output row that leaves the source looks like:

PHYSICAL COLUMNS + FORMAT METADATA COLUMNS + SOURCE METADATA COLUMNS

For Kafka with key and value formats, the row would look like:

PHYSICAL COLUMNS + KEY FORMAT METADATA COLUMNS + VALUE FORMAT METADATA COLUMNS + SOURCE METADATA COLUMNS

Writing metadata via DynamicTableSink

For being able to write out metadata from a query into a sink, we introduce the concept of persisted computed columns. The syntax and some (not all) semantics are borrowed from SQL Server:"Specifies that the Database Engine will physically store the computed values in the table"of virtual columns to exclude non-writable columns.

Let's assume the following example:

CREATE TABLE t (i INT, s STRING, ttimestamp ASBIGINT CAST(SYSTEM_METADATA("timestamp") AS BIGINT) PERSISTEDMETADATA, offset INT METADATA VIRTUAL, d DOUBLE)

Persisted computed columns must only consist of the pattern CAST + SYSTEM_METADATA. Such columns The `timestamp` column can be used for both symmetrically reading and writing metadata. More complex computation are not supported since we cannot perfectly map the expression to the metadata column otherwise. 

Non-virtual (i.e. persisted) metadata Persisted computed columns are part of the query-to-sink schema which means that they need to be specified in an INSERT INTO statement. In order to make those columns optional, we suggest to introduce the syntax:

INSERT INTO t (i, s, d) SELECT ...

This marks persisted computed columns as NULL.

Persisted computed Persisted metadata columns are part of a physical row that enters a sink (either passed by the query or NULL) it is never recomputed.Note: The semantics are not 100% the same with SQL Server. We just borrow syntax and rough meaning. In order to not confuse users, we could also use a completely new keyword. E.g. `STORED` instead of `PERSISTED`.the query or NULL) it is never recomputed.

We suggest the following interfaces for integrating writing metadata into FLIP-95 interfaces:

...

ROW<i INT, s STRING, d DOUBLE, ttimestamp BIGINT>  -- 'timestamp' is moved to the end and has the expected metadata data type, 'offset' is not present 

Writing metadata via EncodingFormat

...

By calling `applyWritableMetadata(...)`, the format enables the modification of the format's consumed row type. The row that goes into the format during runtime will be:  PHYSICAL

PHYSICAL COLUMNS + FORMAT METADATA COLUMNS

Same rules as mentioned above for handling both sink metadata and format metadata apply.

In the end an input row into the sink looks like:

PHYSICAL COLUMNS + FORMAT METADATA COLUMNS +

...

 SINK METADATA COLUMNS

Metadata for existing connectors and formats

...

  • connector metadata has no prefix
  • formats will have a prefix using the factory identifier
  • key/value formats are always prefixed with `key.` and `value.` to avoid collisions

Kafka

KeyData typeRead/WriteNotes
topic
STRINGrWe don't allow writing to different topics for now. Maybe we will allow that in the future via a property.
partition
INTrWe don't allow writing to different partitions for now. Maybe we will allow that in the future via a property.
headers
MAP<STRING, BYTES>r/w
leader-epoch
INTr
offset
BIGINTr
timestamp
BIGINTr/wDirectly forward the underlying type.
timestamp-type
STRINGr['NoTimestampType', 'CreateTime', 'LogAppendTime']

Debezium

property.
partition
INTrWe don't allow writing to different partitions for now. Maybe we will allow that in the future via a property.
headers
MAP<STRING, BYTES>r/w
leader-epoch
INTr
offset
BIGINTr
timestamp
TIMESTAMP(3) WITH LOCAL TIME ZONEr/wDirectly forward the underlying type.
timestamp-type
STRINGr['NoTimestampType', 'CreateTime', 'LogAppendTime']

Debezium

KeyData typeRead/WriteNotes
debezium-json.schema
STRINGrPure JSON string, can be handled with Flink's built-in JSON SQL functions
debezium-json.ingestion-timestamp
TIMESTAMP(3) WITH LOCAL TIME ZONErDirectly forward the underlying type. Do not try to abstract it into a TIMESTAMP.
debezium-json.source.timestamp
TIMESTAMP(3) WITH LOCAL TIME ZONErShortcut for debezium-json.source[ts_ms].
debezium-json.source.database
STRINGrUnified shortcut for `debezium-json.source[db]` across SQL vendors.
debezium-json.source.schema
STRINGrUnified shortcut for `debezium-json.source[schema]` across SQL vendors
KeyData typeRead/WriteNotes
debezium-json.schema
STRINGrPure JSON string, can be handled with Flink's built-in JSON SQL functions
debezium-json.ingestion-timestamp
BIGINTrDirectly forward the underlying type. Do not try to abstract it into a TIMESTAMP.
debezium-json.source.timestamptable
BIGINTSTRINGrShortcut for debeziumUnified shortcut for `debezium-json.source[ts_ms]table/collection]` across SQL vendors.
debezium-json.source.databaseproperties
STRINGMAP<STRING, STRING>rUnified shortcut for `debezium-json.source[db]` across SQL vendors.
debezium-json.source.schema
STRINGrUnified shortcut for `debezium-json.source[schema]` across SQL vendors.
debezium-json.source.table
STRINGrUnified shortcut for `debezium-json.source[table/collection]` across SQL vendors.
debezium-json.source.properties
MAP<STRING, STRING>rAll properties that Debezium provides under `source`. Frequently used properties have a a dedicated key above.
It seems that this is always a property list. So we can make it available as a map for easier access.

Reading and writing from key and value

We suggest defining the places where the particular columns come from in the source options sections. 

We suggest introducing connector specific properties that allow specifying which fields should end up in which parts of the record. The reason why we am not suggesting having a key-value-metadata format is that there are most often differences across records of different systems, e.g.

  • Kinesis, Pravega do not have a key where users can store data

As described previously Kafka has the most complex record structure. It is also the most important source for streaming use cases therefore we will discuss how to support its record below.

We suggest introducing additional properties for controlling different parts of ConsumerRecord:

  • key.fields, key.format, key.(<format-identifier>).(<properties-required-by-format>) - this controls which fields should end up in Kafka’s key and what should be the serialization format
  • value.fields-include, this controls which fields should end up in the value as well, possible values 
    • ALL (all fields of the schema, even if they are part of e.g. the key)
    • EXCEPT_KEY (all fields of the schema - fields of the key)
  • value.format, value.(format-identifier).(<properties-required-by-format>) - The “value” prefix is optional
  • (optional support) fields.verify-integrity - controls if we should perform an equality check if a field is contained in different parts of consumer record (either in key or value)

Note: key.format, value.format, key.fields etc. are only available in Kafka. They are Kafka connector specific properties.

All properties that Debezium provides under `source`. Frequently used properties have a a dedicated key above.
It seems that this is always a property list. So we can make it available as a map for easier access.

Reading and writing from key and value

We suggest defining the places where the particular columns come from in the source options sections. 

We suggest introducing connector specific properties that allow specifying which fields should end up in which parts of the record. The reason why we am not suggesting having a key-value-metadata format is that there are most often differences across records of different systems, e.g.

  • Kinesis, Pravega do not have a key where users can store data

As described previously Kafka has the most complex record structure. It is also the most important source for streaming use cases therefore we will discuss how to support its record below.

We suggest introducing additional properties for controlling different parts of ConsumerRecord:

  • key.fields, key.format, key.(<format-identifier>).(<properties-required-by-format>) - this controls which fields should end up in Kafka’s key and what should be the serialization format
  • value.fields-include, this controls which fields should end up in the value as well, possible values 
    • ALL (all fields of the schema, even if they are part of e.g. the key)
    • EXCEPT_KEY (all fields of the schema - fields of the key)
  • value.format, value.(format-identifier).(<properties-required-by-format>) - The “value” prefix is optional
  • (optional support) fields.verify-integrity - controls if we should perform an equality check if a field is contained in different parts of consumer record (either in key or value)

Note: key.format, value.format, key.fields etc. are only available in Kafka. They are Kafka connector specific properties.

A full example looks like:

CREATE TABLE kafka_table (
    id STRING,
    timestamp TIMESTAMP,
    col1 ...,
    col2 ...
    ....
) WITH (
  'connector' = 'kafka',
  ...
  'key.fields' = 'id, col1'
  'key.format' = 'csv'

  -- optional: controls which fields of the schema should end up in the value (ALL by default) (options: ALL, EXCEPT_KEY)
  'value.fields-include' = 'ALL'  
  'value.format' = 'avro'

  --optional: (false by default) if true and a field originates from more than a single location (e.g. both in key and value), a check will be performed both values are equal
  'fields.verify-integrity' = 'false'
)

Rejected alternatives

Copied from the long ML discussion.

Option 1

Declare everything via propertiesA full example looks like:

CREATE TABLE kafka_table (
id BIGINT,
    idname STRING,
col1 STRING,
    timestamp TIMESTAMP,
    col1 ...,
    col2 ...
    ....col2 STRING,
ts TIMESTAMP(3) WITH LOCAL TIME ZONE, -- ts is a normal field, so can
be read and written.
offset AS SYSTEM_METADATA("offset")
) WITH (
  'connector'  = 'kafka',
  ...
 
'topic' = 'test-topic',
'key.fields' = 'id, col1name',
  'key.format' = 'csv'
  -- optional: controls which fields of the schema should end up in the value (ALL by default) (options: ALL, EXCEPT_KEY)
  'value.fields-includeformat' = 'ALLavro'   ,
  'valuetimestamp.formatfield' = 'avro'
  --optional: (false by default) if true and a field originates from more than a single location (e.g. both in key and value), a check will be performed both values are equal
  'fields.verify-integrity' = 'false'
)

Rejected alternatives

Marking columns in the schema section:

...

CREATE TABLE kafka_table (
  id STRING KEY,
  timestamp TIMESTAMP HEADER("timestamp"),
  col1 ... KEY/HEADER("key"),
  col2 ...
  ...
) WITH (
  'connector.type' = 'kafka',
  ...
  'format.type' = 'kafka-format'
  'format.key.type' = 'csv'
  'format.value.type' = 'avro'
)

ts' -- define the mapping of Kafka timestamp
);

Pros:

  • "timestamp", "headers" are something like "key" and "value" that are stored with the real data. So why not define the "timestamp" in the same way with "key" by using a "timestamp.field" connector option?

Cons:

  • "key" and "value" in the properties are a special case because they need to configure a format. 
  • We have way more metadata fields like headers, epoch-leader, etc. Having a property for all of this metadata would mess up the WITH section entirely. Furthermore, we also want to deal with metadata from the formats. Solving this through properties as well would further complicate the property design.

Option 2

Use computed columns:

CREATE TABLE kafka_table (
id BIGINT,
name STRING,
timestamp AS CAST(SYSTEM_METADATA("timestamp") AS INT) PERSISTED,
headers AS CAST(SYSTEM_METADATA("headers") AS MAP<STRING, BYTES>) PERSISTED
) WITH (
...

Pros:

  • Allows to have full flexibility to compute the final column and avoid helper columns:
    timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3)))

Cons:

  • Mixes concepts of metadata and computed column.
  • Would need the concept of PERSISTED computed columns but not with 100% the same semantics as other vendors
  • Complicated syntax:

    a) CREATE TABLE t (a AS CAST(SYSTEM_METADATA("offset") AS INT))
    pro: readable, complex arithmetic possible, more SQL compliant, SQL Server compliant
    con: long

    b) CREATE TABLE t (a INT AS SYSTEM_METADATA("offset"))
    pro: shorter, not SQL nor SQL Server compliant
    con: requires parser changes, no complex arithmetic like `computeSomeThing(SYSTEM_METADATA("offset"))` possible

    c) CREATE TABLE t (a AS SYSTEM_METADATA("offset", INT))
    pro: shorter, very readable, complex arithmetic possible
    con: non SQL expression, requires parser changes

Option 3

Marking key columns in the schema section:

There is a couple of issues with this approach:

  • mixes logical schema definition with physical representation of format and/or source (kafka the only source that has a meaningful key, all other sources like pulsar/kinesis/pravega use key only as a partitioning hash and support string based keys only). Moreover it makes it harder to reason about when implementing CREATE TABLE … AS SELECT …

For CREATE TABLE … AS SELECT … there is no place to put the modifiers, as the schema is derived from the query.

  • cumbersome or impossible to handle columns that are stored in multiple places (both in key and value)

Accessing read only data

Sources often provide metadata information that can be accessed in a read only manner. Such metadata include:

  • partitionId
  • offset
  • shardId
  • ingestion time

Those kind of properties we could access using computed columns:

Option 1:

Generic SYSTEM_METADATA(“property-key”)

CREATE TABLE kafka_table (
  id STRING KEY,
  offset BIGINT AS SYSTEM_METADATAcol1 ... KEY/HEADER("offsetkey"),
  partitionId BIGINT AS SYSTEM_METADATA("partition")col2 ...
  ...
) WITH (
  'connector.type' = 'kafka',
)

How to derive type of a column

Both options have the issue where to get the type of a computed column from. In the second option partitionId between different sources might be expressed with different types (e.g. with long or String). The returned type depends on the type of source.

In option 1 the problem is even more visible, as the type will differ depending on the requested property. This could be alleviated by having a return type strategy based on a value of constant, but it would require that the function lists all properties for all sources. Moreover the problem described for option 1 remains.

Solution:

Enable declaring type for computed column. In that case the return type strategy could use that type as return type. It was part of the initial proposal of FLIP-70, but was removed from it. Reintroducing declaring result type explicitly will also benefit regular computed columns. Currently it is not possible to use functions that infer result type based on expected type.

...

  ...
  'format.type' = 'kafka-format'
  'format.key.type' = 'csv'
  'format.value.type' = 'avro'
)


There is a couple of issues with this approach:

  • mixes logical schema definition with physical representation of format and/or source (kafka the only source that has a meaningful key, all other sources like pulsar/kinesis/pravega use key only as a partitioning hash and support string based keys only). Moreover it makes it harder to reason about when implementing CREATE TABLE … AS SELECT …

For CREATE TABLE … AS SELECT … there is no place to put the modifiers, as the schema is derived from the query.

  • cumbersome or impossible to handle columns that are stored in multiple places (both in key and value)

Option 4

Specific per property functions

CREATE TABLE kafka_table (
  id STRING,
  offset AS SOURCE_OFFSET(),
  partitionId AS SOURCE_PARTITION()
  ...
) WITH (
  'connector.type' = 'kafka'
)

I think option 1 is more appealing as it is more future proof. It makes it easier to add new properties in the future.

Option 3:

Similar to option 1, but instead of using the computed columns we could introduce a special syntax. This would prohibit using Con: Would add too many built-in functions.

Future Work

Some topics that are not part of this FLIP anymore.

...