You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 16 Next »

Status

Current state: "Under discussion"

Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-107-Reading-table-columns-from-different-parts-of-source-records-td38277.html

JIRA: Unable to render Jira issues macro, execution error.

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Motivation

Besides the main payload, the majority of connectors (and also many formats) expose additional information that should be readable and (depending on the use case) also writable as metadata.

It can simply be read-only metadata such as a Kafka read-offset or ingestion time. But can also add or remove header information (e.g. a message hash, or record version) to every Kafka ProducerRecord. Additionally, users might want to read and write only parts of the record that contain data but additionally serve different purposes (e.g. compaction by key).

We should make it possible to read and write data from all of those locations.

Kafka is the source with the most intricacies as it allows storing data in multiple different places of the records. Each of those places is/can be serialized differently. Moreover some of them might serve different purposes:

  • all of them can be just a data container,
  • key for partitioning (hash on the key), 
  • key for compacting (if topic is compacted records with same key within a partition are merged), 
  • timestamp for log retention
  • header for metadata

Because of the reasons above Kafka will be used for a majority of the examples.

Also formats should be able to expose metadata, FLIP-132 is just one example where the Debezium format might expose a "db_operation_time" that is not part of the schema itself.

Other use cases could be exposing Avro version or Avro schema as meta information per record.

Examples

Kafka: Read, transform, and write back with key and value. All fields of the key are present in the value as well.

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  col1 STRING,
  col2 STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic',

  'key.fields' = 'id, name',
  'key.format' = 'csv',

  'value.format' = 'avro'
)

An insert statement

INSERT INTO kafka_table VALUES (
  (1, "ABC", "col1", "col2")
)

will result in a ProducerRecord as follows:

new ProducerRecord<>(
  "test-topic",
  null,
  CSV.serialize(id, name),
  AVRO.serialize(id, name, col1, col2)
)


A slightly different use case is to manually control where to store the columns data:

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  col1 STRING,
  col2 STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic', 

  'key.format' = 'csv',
  'key.fields' = 'id, name',

  'value.format' = 'avro',
  'value.fields-include' = 'EXCEPT_KEY'
)


An insert statement

INSERT INTO kafka_table VALUES (
  (1, "ABC", "col1", "col2")
)

will result in a ProducerRecord as follows:

new ProducerRecord<>(
  "test-topic",
  null,
  CSV.serialize(id, name),
  AVRO.serialize(col1, col2)
)

Kafka: Read metadata from Kafka's ConsumerRecord

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  offset AS CAST(SYSTEM_METADATA("offset") AS INT),
  headers AS CAST(SYSTEM_METADATA("headers") AS MAP<STRING, BYTES>)
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic', 

  'format' = 'avro'
)

Kinesis: Read other metadata from Kinesis

Kinesis represents partition keys as strings:

CREATE TABLE kinesis_table (
  id BIGINT,
  name STRING,
  partitionId AS CAST(SYSTEM_METADATA("partition") AS STRING)
) WITH (
  'connector' = 'kinesis',
  'value.format' = 'avro'
)

SELECT * FROM kinesis_table;

-- Partition is a computed column, therefore it cannot be written to. Statements like below will fail:
INSERT INTO kinesis_table VALUES (1, "ABC", "shard-0000")

Kafka + Canal JSON Format: Both connector and format expose metadata

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  offset AS CAST(SYSTEM_METADATA("offset") AS INT), -- from Kafka
  database AS CAST(SYSTEM_METADATA("database") AS STRING) -- from Canal
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic', 

  'format' = 'canal-json'
)

Using the following format:

{
    "data": [
        {
            "id": "102",
            "name": "car battery"
        }
    ],
    "database": "inventory”,
    "table": "products",
    "es": 1589374013000,
    "ts": 1589374013680,
    "type": "DELETE"
}

Kafka: Read metadata from Kafka's ConsumerRecord and use it for computation

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  offset AS CAST(SYSTEM_METADATA("offset") AS INT) + 1000
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic', 

  'format' = 'avro'
)

Kafka: Write metadata into Kafka's ProducerRecord

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  timestamp AS CAST(SYSTEM_METADATA("timestamp") AS BIGINT) PERSISTED,
  headers AS CAST(SYSTEM_METADATA("headers") AS MAP<STRING, BYTES>) PERSISTED
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic', 

  'format' = 'avro'
)

An insert statement could look like:

INSERT INTO kafka_table VALUES (
  (1, "ABC", 1599133672, MAP('checksum', computeChecksum(...)))
)

Or with no persisted metadata:

INSERT INTO kafka_table (id, name) VALUES (
  (1, "ABC")
)

Implementation Details

Reading metadata via DynamicTableSource

Let's assume the following example:

CREATE TABLE t (i INT, s STRING, o AS CAST(SYSTEM_METADATA("offset") AS INT), d DOUBLE)

`CAST(SYSTEM_METADATA("offset") AS INT)` would be a valid read-only computed column for Kafka and can be extracted by the planner.

`SYSTEM_METADATA("offset")` returns the NULL type by default and thus is invalid without a CAST similar to `CREATE TABLE t (a AS NULL)`.

Note: Alternatively, we could allow the syntax `SYSTEM_METADATA("offset", INT)` for avoiding the CAST, however, this would require changes in the parser (and maybe validator). Because types cannot be used as an argument easily.

We suggest the following interfaces for integrating reading metadata into FLIP-95 interfaces:

interface SupportsReadingMetadata {

Map<String, DataType> listReadableMetadata();

void applyReadableMetadata(List<String> metadataKeys, DataType outputDataType);
}

`listReadableMetadata()` allows for validation and provides information about expected data types for the output row.

`applyReadableMetadata(List<String> metadataKeys, DataType outputDataType)` provides the list of required metadata by the DDL and query. It incorporates information from projection push down and only requests metadata that is necessary for executing the given query.

A DynamicTableSource must append the requested metadata columns (such as `offset INT` in the example) to the source's original output row in the order of the given list. This also works for a row that is passed into SupportsComputedColumnPushDown.

In general, the method `applyReadableMetadata(...)` is very similar to `SupportsComputedColumnPushDown#applyComputedColumn`. The method provides an updated output data type to create TypeInformation similar to SupportsComputedColumnPushDown (i.e. projection pushdown is considered and applied before). This is the output type that also the planner expects.

The order of execution in a source is:

projection push down → metadata appending → computed column execution (in source or follow-up operator)

All other row data types do not change. This means:

TableSchema.toPhysicalRowDataType => pure physical data, excluding computed columns and metadata columns
TableSchema.toRowDataType => full schema with all kinds of computed columns

Handling of Data Types

For making the use of SYSTEM_METADATA easier and avoid nested casting such as:

rowtime AS CAST(CAST(SYSTEM_METADATA("timestamp") AS BIGINT) AS
TIMESTAMP(3) WITH LOCAL TIME ZONE)

We allow explicit casting to a target data type:

rowtime AS CAST(SYSTEM_METADATA("timestamp") AS TIMESTAMP(3) WITH LOCAL TIME ZONE)

A connector still produces and consumes the data type returned by `listMetadata()`. The planner will insert necessary explicit casts.

In any case, the user must provide a CAST such that the computed column receives a valid data type when constructing the table schema.

Reading metadata via DecodingFormat

We allow formats to expose metadata as well. By default, a format does not provide any metadata. The information is not used unless a source supports it.

interface DecodingFormat {

default Map<String, DataType> listReadableMetadata() {
return Collections.emptyMap();
}

default void applyReadableMetadata(List<String> metadataKeys) {
throw new UnsupportedOperationException();
}
}

Similar to the source, `listReadableMetadata()`allows for validation and provides information about expected data types for the format's output row.

`applyReadableMetadata(...)` enables the modification of the format's produced row type to: PHYSICAL COLUMNS + FORMAT METADATA COLUMNS

The source must call the methods above and forward them if it implements `SupportsReadingMetadata`.

Both source and format can offer metadata columns. We define the following behavior for a consistent interaction:

  • The source must add metadata columns in the order as passed by the planner.
  • The passed `List<String> metadataKeys` will be ordered according to the iteration order of `listReadableMetadata()` (so it might be beneficial that implementations return a LinkedHashMap, but don't have to).
  • Format metadata keys have higher precedence than source metadata keys in case of duplicate key names (this case should happen very rarely).

In the end an output row that leaves the source looks like:

PHYSICAL COLUMNS + FORMAT METADATA COLUMNS + SOURCE METADATA COLUMNS

Writing metadata via DynamicTableSink

For being able to write out metadata from a query into a sink, we introduce the concept of persisted computed columns. The syntax and some (not all) semantics are borrowed from SQL Server:

"Specifies that the Database Engine will physically store the computed values in the table"

Let's assume the following example:

CREATE TABLE t (i INT, s STRING, t AS CAST(SYSTEM_METADATA("timestamp") AS BIGINT) PERSISTED, d DOUBLE)

Persisted computed columns must only consist of the pattern CAST + SYSTEM_METADATA. Such columns can be used for both symmetrically reading and writing metadata. More complex computation are not supported since we cannot perfectly map the expression to the metadata column otherwise.

Persisted computed columns are part of the query-to-sink schema which means that they need to be specified in an INSERT INTO statement. In order to make those columns optional, we suggest to introduce the syntax:

INSERT INTO t (i, s, d) SELECT ...

This marks persisted computed columns as NULL.

Persisted computed columns are part of a physical row that enters a sink (either passed by the query or NULL) it is never recomputed.

Note: The semantics are not 100% the same with SQL Server. We just borrow syntax and rough meaning. In order to not confuse users, we could also use a completely new keyword. E.g. `STORED` instead of `PERSISTED`.

We suggest the following interfaces for integrating writing metadata into FLIP-95 interfaces:

interface SupportsWritingMetadata {

Map<String, DataType> listWritableMetadata();

void applyWritableMetadata(List<String> metadataKeys);
}

The interface is similar to `SupportsReadingMetadata`. `listWritableMetadata()` is used for validation and metadata information.

The `applyWritableMetadata(...)` provides the final list of keys that is used in the DDL. The method also enables writing metadata in formats. 

By definition, the planner will put persisted columns to the end of the physical row before passing it to a sink this makes it easier to split metadata and actual data. The sink can use `TableSchema.toPhysicalRowDataType` to extract the pure data as before.

For the example above, a row that enters a sink looks like:

ROW<i INT, s STRING, d DOUBLE, t BIGINT> 

Writing metadata via EncodingFormat

Similar to DecodingFormat, we add similar default methods to EncodingFormat. That can be used by the DynamicTableSink.

interface EncodingFormat {

default Map<String, DataType> listWritableMetadata() {
return Collections.emptyMap();
}

default void applyWritableMetadata(List<String> metadataKeys) {
throw new UnsupportedOperationException();
}
}

By calling `applyWritableMetadata(...)`, the format enables the modification of the format's consumed row type. The row that goes into the format during runtime will be:  PHYSICAL COLUMNS + FORMAT METADATA COLUMNS

Same rules as mentioned above for handling both sink metadata and format metadata apply.

In the end an input row into the sink looks like:

PHYSICAL COLUMNS + FORMAT METADATA COLUMNS + SOURCE METADATA COLUMNS

Metadata for existing connectors and formats

For completeness, we list the first metadata for existing Flink connectors and formats. The naming follows FLIP-122 which means:

  • connector metadata has no prefix
  • formats will have a prefix using the factory identifier
  • key/value formats are prefixed with `key.` and `value.`

Kafka

KeyData typeRead/WriteNotes
topic
STRINGrWe don't allow writing to different topics for now. Maybe we will allow that in the future via a property.
partition
INTrWe don't allow writing to different partitions for now. Maybe we will allow that in the future via a property.
headers
MAP<STRING, BYTES>r/w
leader-epoch
INTr
offset
BIGINTr
timestamp
BIGINTr/wDirectly forward the underlying type.
timestamp-type
STRINGr['NoTimestampType', 'CreateTime', 'LogAppendTime']

Debezium

KeyData typeRead/WriteNotes
debezium-json.schema
STRINGrPure JSON string, can be handled with Flink's built-in JSON SQL functions
debezium-json.ingestion-timestamp
BIGINTrDirectly forward the underlying type. Do not try to abstract it into a TIMESTAMP.
debezium-json.source.timestamp
BIGINTrShortcut for debezium-json.source[ts_ms].
debezium-json.source.database
STRINGrUnified shortcut for `debezium-json.source[db]` across SQL vendors.
debezium-json.source.schema
STRINGrUnified shortcut for `debezium-json.source[schema]` across SQL vendors.
debezium-json.source.table
STRINGrUnified shortcut for `debezium-json.source[table/collection]` across SQL vendors.
debezium-json.source.properties
MAP<STRING, STRING>rAll properties that Debezium provides under `source`. Frequently used properties have a a dedicated key above.
It seems that this is always a property list. So we can make it available as a map for easier access.

Reading and writing from key and value

We suggest defining the places where the particular columns come from in the source options sections. 

We suggest introducing connector specific properties that allow specifying which fields should end up in which parts of the record. The reason why we am not suggesting having a key-value-metadata format is that there are most often differences across records of different systems, e.g.

  • Kinesis, Pravega do not have a key where users can store data

As described previously Kafka has the most complex record structure. It is also the most important source for streaming use cases therefore we will discuss how to support its record below.

We suggest introducing additional properties for controlling different parts of ConsumerRecord:

  • key.fields, key.format, key.(<format-identifier>).(<properties-required-by-format>) - this controls which fields should end up in Kafka’s key and what should be the serialization format
  • value.fields-include, this controls which fields should end up in the value as well, possible values 
    • ALL (all fields of the schema, even if they are part of e.g. the key)
    • EXCEPT_KEY (all fields of the schema - fields of the key)
  • value.format, value.(format-identifier).(<properties-required-by-format>) - The “value” prefix is optional
  • (optional support) fields.verify-integrity - controls if we should perform an equality check if a field is contained in different parts of consumer record (either in key or value)

Note: key.format, value.format, key.fields etc. are only available in Kafka. They are Kafka connector specific properties.

A full example looks like:

CREATE TABLE kafka_table (
    id STRING,
    timestamp TIMESTAMP,
    col1 ...,
    col2 ...
    ....
) WITH (
  'connector' = 'kafka',
  ...
  'key.fields' = 'id, col1'
  'key.format' = 'csv'

  -- optional: controls which fields of the schema should end up in the value (ALL by default) (options: ALL, EXCEPT_KEY)
  'value.fields-include' = 'ALL'  
  'value.format' = 'avro'

  --optional: (false by default) if true and a field originates from more than a single location (e.g. both in key and value), a check will be performed both values are equal
  'fields.verify-integrity' = 'false'
)

Rejected alternatives

Marking columns in the schema section:

CREATE TABLE kafka_table (
  id STRING KEY,
  timestamp TIMESTAMP HEADER("timestamp"),
  col1 ... KEY/HEADER("key"),
  col2 ...
  ...
) WITH (
  'connector.type' = 'kafka',
  ...
  'format.type' = 'kafka-format'
  'format.key.type' = 'csv'
  'format.value.type' = 'avro'
)


There is a couple of issues with this approach:

  • mixes logical schema definition with physical representation of format and/or source (kafka the only source that has a meaningful key, all other sources like pulsar/kinesis/pravega use key only as a partitioning hash and support string based keys only). Moreover it makes it harder to reason about when implementing CREATE TABLE … AS SELECT …

For CREATE TABLE … AS SELECT … there is no place to put the modifiers, as the schema is derived from the query.

  • cumbersome or impossible to handle columns that are stored in multiple places (both in key and value)

Accessing read only data

Sources often provide metadata information that can be accessed in a read only manner. Such metadata include:

  • partitionId
  • offset
  • shardId
  • ingestion time


Those kind of properties we could access using computed columns:

Option 1:

Generic SYSTEM_METADATA(“property-key”)

CREATE TABLE kafka_table (
  id STRING,
  offset BIGINT AS SYSTEM_METADATA("offset"),
  partitionId BIGINT AS SYSTEM_METADATA("partition")
  ...
) WITH (
  'connector.type' = 'kafka'
)

How to derive type of a column

Both options have the issue where to get the type of a computed column from. In the second option partitionId between different sources might be expressed with different types (e.g. with long or String). The returned type depends on the type of source.

In option 1 the problem is even more visible, as the type will differ depending on the requested property. This could be alleviated by having a return type strategy based on a value of constant, but it would require that the function lists all properties for all sources. Moreover the problem described for option 1 remains.

Solution:

Enable declaring type for computed column. In that case the return type strategy could use that type as return type. It was part of the initial proposal of FLIP-70, but was removed from it. Reintroducing declaring result type explicitly will also benefit regular computed columns. Currently it is not possible to use functions that infer result type based on expected type.

Option 2:

Specific per property functions

CREATE TABLE kafka_table (
  id STRING,
  offset AS SOURCE_OFFSET(),
  partitionId AS SOURCE_PARTITION()
  ...
) WITH (
  'connector.type' = 'kafka'
)


I think option 1 is more appealing as it is more future proof. It makes it easier to add new properties in the future.

Option 3:

Similar to option 1, but instead of using the computed columns we could introduce a special syntax. This would prohibit using 

Future Work

Some topics that are not part of this FLIP anymore.

Kafka: Push down offset filter

CREATE TABLE kafka_table (
  id BIGINT,
  offset BIGINT AS SYSTEM_METADATA("offset")
) WITH (
  'connector.type' = 'kafka',
  'value.format.type' = 'avro'
)

SELECT * FROM kafka_table WHERE offset > 123456;

Kinesis: Specify custom partitioning

CREATE TABLE kinesis_table (
  id BIGINT,
  name STRING)
PARTITIONED BY HASH(id)
WITH (
  'connector.type' = 'kinesis',
  'value.format.type' = 'avro'
)

INSERT INTO kinesis_table VALUES(
    (1, 'ABC', 1),
    (1, 'ABC', 3)
)

Both records will end up in the same partition.

Kafka: Manually specify target partition, overwrite partitioning by key.

CREATE TABLE kinses_table (
  id BIGINT,
  name STRING,
  partitionId BIGINT)
PARTITIONED BY HASH(partitionId)
WITH (
  'connector.type' = 'kafka',
  'key.fields' = 'id, name',
  'key.format.type' = 'csv',

  'value.format.type' = 'avro'
)

INSERT INTO kafka_table VALUES(
    (1, 'ABC', 1),
    (1, 'ABC', 3)
)

The two records end up in different partitions even though the resulting Kafka’s keys will be equal for both records.

Rejected alternatives:

Writing to a specific partition/partitioning of source

A common requirement is to create a custom partitioning of the data. We should have a way to specify/compute target partition/shard for Kinesis/Pravega/Pulsar. In those cases it would be the only way to control partitioning.

In the case of Kafka we could use the key for data partitioning as described in the first section. However, it does not have to always correspond to the partition that a record originates from/is written to. The default partitioner uses the hash value of the key and the total number of partitions on a topic to determine the partition number. If you increase a partition number, then the default partitioner will return different numbers even if you provide the same key.  Users might want to implement a custom Kafka’s partitioner to be able to rescale the topic.

Use PARTITIONED BY clause with different partitioning strategies (similar to ORACLE):

Partition by hash:

CREATE TABLE kafka_table (
  id STRING,
  name STRING,
  date: DATE,
  key: BINARY
  ...
) WITH (
  'connector.type' = 'kafka'
  'key.fields' = 'key' --the key will not be used for partitioning
)
PARTITIONED BY HASH(id, name)


If necessary we can introduce more partitioning strategies E.g. explicit partitioning:

-- partitioning based on an existing column in data
CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  date: DATE,
  key: BINARY
  ...
) WITH (
  'connector.type' = 'kafka'
  'key.fields' = 'key' -- the key will not be used for partitioning
)
PARTITIONED BY EXPLICIT(id)

-- or on a computed column
CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  targetPartition BIGINT AS CAST(name AS BIGINT),
  date: DATE,
  key: BINARY
  ...
) WITH (
  'connector.type' = 'kafka'
  'key.fields' = 'key' -- the key will not be used for partitioning
)
PARTITIONED BY EXPLICIT(targetPartition)

-- Or with user defined functions in the partitioned by clause
CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  date: DATE,
  key: BINARY
  ...
) WITH (
  'connector.type' = 'kafka'
  'key.fields' = 'key' -- the key will not be used for partitioning
)
PARTITIONED BY EXPLICIT(USER_SCALAR_FUNCTION(name, date, SYSTEM_METADATA("numberOfPartitions"))

Relation to FLIP-63

Flip-63 introduced initial support for PARTITIONED BY clause to an extent that let us support Hive's partitioning. In majority of situations it is in line with the proposal above.

The only significant difference is that this proposal suggests adding partitioning strategies e.g. PARTITIONED BY HASH/EXPLICIT.  Currently implemented strategy works as an EXPLICIT strategy that requires a list of any type as a partition identifier. We could make the EXPLICIT strategy default, which could let us support current PARTITION BY syntax.

The current implementation requires the column in PARTITIONED BY clause to be also defined in the schema part of the table. It is the same as in this proposal.

FLIP-63 discusses syntax as INSERT INTO ... PARTITION(...) → It does not interfere with this proposal. It just implies additional check that the row matches the partition requirements(columns in rows match the partition specification)

FLIP-63 discusses more in-depth filesystem specific partition pruning. This proposal leaves out that part to specific source implementations. Therefore it does not contradict with FLIP-63.


  • No labels