...
CREATE TABLE kafka_table ( |
...
CREATE TABLE kinesis_table ( |
...
CREATE TABLE kafka_table ( |
...
{ "data": [ { "id": "102", "name": "car battery" } ], "database": "inventory”, "table": "products", "es": 1589374013000, "ts": 1589374013680, "type": "DELETE" } |
Kafka:
...
Write metadata
...
into Kafka's
...
ProducerRecord
CREATE TABLE kafka_table ( |
Kafka: Write metadata into Kafka's ProducerRecord
An insert statement could look like:
INSERT INTOCREATE TABLE kafka_table VALUES ( |
An insert statement could look like:
...
INSERT INTO kafka_table VALUES (
(1, "ABC", 1599133672, MAP('checksum', computeChecksum(...)))
)
Or with no persisted metadata:
...
INSERT INTO kafka_table (id, name) VALUES (
(1, "ABC")
)
Implementation Details
Reading metadata via DynamicTableSource
Let's assume the following example:
...
CREATE TABLE t (i INT, s STRING, o AS CAST(SYSTEM_METADATA("offset") AS INT), d DOUBLE)
(1, "ABC", 1599133672, MAP('checksum', computeChecksum(...))) |
Or with no persisted metadata:
INSERT INTO kafka_table (id, name) VALUES ( |
Implementation Details
Syntax and Semantics
Let's assume the following example:
CREATE TABLE t (i INT, s STRING, timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA, d DOUBLE) |
The `timestamp` column is declared as a `METADATA` column. By default, the column name is used to map to a corresponding metadata key ("timestamp" in this case).
The data type of the column is used to perform an explicit cast of the original metadata data type. For example, a Kafka metadata timestamp is defined as `BIGINT`. A user can declare the column as `TIMESTAMP(3)` though and thus force an explicit cast from sources (BIGINT to TIMESTAMP) and to sinks (TIMESTAMP to BIGINT).
CREATE TABLE t (i INT, s STRING, myOffset INT METADATA FROM 'offset', d DOUBLE) |
In order to use a different column name, it is possible to use `FROM` to reference the metadata key ("offset" in this case).
CREATE TABLE t (i INT, s STRING, offset INT METADATA VIRTUAL, d DOUBLE) |
`offset INT METADATA` `CAST(SYSTEM_METADATA("offset") AS INT)` would be a valid read-only computed column for Kafka and can be extracted by the planner.
`SYSTEM_METADATA("offset")` returns the NULL type by default and thus is invalid without a CAST similar to `CREATE TABLE t (a AS NULL)`.
...
However, since declared tables can be used for sources and sinks, we need to be able to exclude read-only metadata from writing.
The `VIRTUAL` keyword excludes the column from the query-to-sink schema which means that a query cannot write to a this metadata column.
Reading metadata via DynamicTableSource
Let's assume the following example:
CREATE TABLE t (i INT, s STRING, offset INT METADATA VIRTUAL, d DOUBLE) |
We suggest the following interfaces for integrating reading metadata into FLIP-95 interfaces:
...
Handling of Data Types
For making the use of SYSTEM_METADATA metadata easier and avoid nested casting such as:
rowtime |
BIGINT METADATA FROM 'timestamp' |
We allow explicit casting to a target data type:
rowtime |
TIMESTAMP(3) WITH LOCAL TIME ZONE |
METADATA FROM 'timestamp' |
A connector still produces and consumes the data type returned by `listMetadata()`. The planner will insert necessary explicit casts.
In any case, the user must provide a CAST data type such that the computed metadata column receives a valid data type when constructing the table schema.
...
`applyReadableMetadata(...)` enables the modification of the format's produced row type to: PHYSICAL
PHYSICAL COLUMNS + FORMAT METADATA COLUMNS
The source must call the methods above and forward them if it implements `SupportsReadingMetadata`.
...
In the end an output row that leaves the source looks like:
PHYSICAL COLUMNS + FORMAT METADATA COLUMNS + SOURCE METADATA COLUMNS
Writing metadata via DynamicTableSink
For being able to write out metadata from a query into a sink, we introduce the concept of persisted computed columns. The syntax and some (not all) semantics are borrowed from SQL Server:
"Specifies that the Database Engine will physically store the computed values in the table"
Let's assume the following example:
...
CREATE TABLE t (i INT, s STRING, t AS CAST(SYSTEM_METADATA("timestamp") AS BIGINT) PERSISTED, d DOUBLE)
For Kafka with key and value formats, the row would look like:
PHYSICAL COLUMNS + KEY FORMAT METADATA COLUMNS + VALUE FORMAT METADATA COLUMNS + SOURCE METADATA COLUMNS
Writing metadata via DynamicTableSink
For being able to write out metadata from a query into a sink, we introduce the concept of virtual columns to exclude non-writable columns.
Let's assume the following example:
CREATE TABLE t (i INT, s STRING, timestamp BIGINT METADATA, offset INT METADATA VIRTUAL, d DOUBLE) |
The `timestamp` column can be used for both symmetrically reading and writing metadata.
Non-virtual (i.e. persisted) metadata columns are part of the
Persisted computed columns must only consist of the pattern CAST + SYSTEM_METADATA. Such columns can be used for both symmetrically reading and writing metadata. More complex computation are not supported since we cannot perfectly map the expression to the metadata column otherwise.
Persisted computed columns are part of the query-to-sink schema which means that they need to be specified in an INSERT INTO statement. In order to make those columns optional, we suggest to introduce the syntax:
INSERT INTO t (i, s, d) SELECT ... |
This marks persisted computed columns as NULL.
Persisted computed Persisted metadata columns are part of a physical row that enters a sink (either passed by the query or NULL) it is never recomputed.
Note: The semantics are not 100% the same with SQL Server. We just borrow syntax and rough meaning. In order to not confuse users, we could also use a completely new keyword. E.g. `STORED` instead of `PERSISTED`.
We suggest the We suggest the following interfaces for integrating writing metadata into FLIP-95 interfaces:
...
ROW<i INT, s STRING, d DOUBLE, ttimestamp BIGINT> |
Writing metadata via EncodingFormat
Similar to DecodingFormat, we add similar default methods to EncodingFormat. That can be used by the DynamicTableSink.
-- 'timestamp' is moved to the end and has the expected metadata data type, 'offset' is not present |
Writing metadata via EncodingFormat
Similar to DecodingFormat, we add similar default methods to EncodingFormat. That can be used by the DynamicTableSink.
interface EncodingFormat interface EncodingFormat { |
By calling `applyWritableMetadata(...)`, the format enables the modification of the format's consumed row type. The row that goes into the format during runtime will be: PHYSICAL
PHYSICAL COLUMNS + FORMAT METADATA COLUMNS
Same rules as mentioned above for handling both sink metadata and format metadata apply.
In the end an input row into the sink looks like:
PHYSICAL COLUMNS + FORMAT METADATA COLUMNS +
...
SINK METADATA COLUMNS
Metadata for existing connectors and formats
...
- connector metadata has no prefix
- formats will have a prefix using the factory identifier
- key/value formats are always prefixed with `key.` and `value.` to avoid collisions
Kafka
Key | Data type | Read/Write | Notes |
---|---|---|---|
topic | STRING | r | We don't allow writing to different topics for now. Maybe we will allow that in the future via a property. |
partition | INT | r | We don't allow writing to different partitions for now. Maybe we will allow that in the future via a property. |
headers | MAP<STRING, BYTES> | r/w | |
leader-epoch | INT | r | |
offset | BIGINT | r | |
timestamp | BIGINT | r/w | Directly forward the underlying type. |
timestamp-type | STRING | r | ['NoTimestampType', 'CreateTime', 'LogAppendTime'] |
...
CREATE TABLE kafka_table ( |
Rejected alternatives
Copied from the long ML discussion.
Option 1
Declare everything via propertiesMarking columns in the schema section:
CREATE TABLE kafka_table ( |
id |
timestamp TIMESTAMP HEADER("timestamp"),
col1 ... KEY/HEADER("key"),
col2 ...
...
) WITH (
'connector.type' = 'kafka',
...
'format.type' = 'kafka-format'
'format.key.type' = 'csv'
'format.value.type' = 'avro'
)
There is a couple of issues with this approach:
- mixes logical schema definition with physical representation of format and/or source (kafka the only source that has a meaningful key, all other sources like pulsar/kinesis/pravega use key only as a partitioning hash and support string based keys only). Moreover it makes it harder to reason about when implementing CREATE TABLE … AS SELECT …
For CREATE TABLE … AS SELECT … there is no place to put the modifiers, as the schema is derived from the query.
- cumbersome or impossible to handle columns that are stored in multiple places (both in key and value)
Accessing read only data
Sources often provide metadata information that can be accessed in a read only manner. Such metadata include:
- partitionId
- offset
- shardId
- ingestion time
Those kind of properties we could access using computed columns:
Option 1:
BIGINT, |
Pros:
- "timestamp", "headers" are something like "key" and "value" that are stored with the real data. So why not define the "timestamp" in the same way with "key" by using a "timestamp.field" connector option?
Cons:
- "key" and "value" in the properties are a special case because they need to configure a format.
- We have way more metadata fields like headers, epoch-leader, etc. Having a property for all of this metadata would mess up the WITH section entirely. Furthermore, we also want to deal with metadata from the formats. Solving this through properties as well would further complicate the property design.
Option 2
Use computed columns:
CREATE TABLE kafka_table ( |
Pros:
- Allows to have full flexibility to compute the final column and avoid helper columns:
timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3)))
Cons:
- Mixes concepts of metadata and computed column.
- Would need the concept of PERSISTED computed columns but not with 100% the same semantics as other vendors
- Complicated syntax:
a) CREATE TABLE t (a AS CAST(SYSTEM_METADATA("offset") AS INT))
pro: readable, complex arithmetic possible, more SQL compliant, SQL Server compliant
con: longb) CREATE TABLE t (a INT AS SYSTEM_METADATA("offset"))
pro: shorter, not SQL nor SQL Server compliant
con: requires parser changes, no complex arithmetic like `computeSomeThing(SYSTEM_METADATA("offset"))` possiblec) CREATE TABLE t (a AS SYSTEM_METADATA("offset", INT))
pro: shorter, very readable, complex arithmetic possible
con: non SQL expression, requires parser changes
Option 3
Marking key columns in the schema section:Generic SYSTEM_METADATA(“property-key”)
CREATE TABLE kafka_table ( |
How to derive type of a column
Both options have the issue where to get the type of a computed column from. In the second option partitionId between different sources might be expressed with different types (e.g. with long or String). The returned type depends on the type of source.
In option 1 the problem is even more visible, as the type will differ depending on the requested property. This could be alleviated by having a return type strategy based on a value of constant, but it would require that the function lists all properties for all sources. Moreover the problem described for option 1 remains.
Solution:
Enable declaring type for computed column. In that case the return type strategy could use that type as return type. It was part of the initial proposal of FLIP-70, but was removed from it. Reintroducing declaring result type explicitly will also benefit regular computed columns. Currently it is not possible to use functions that infer result type based on expected type.
...
... |
There is a couple of issues with this approach:
- mixes logical schema definition with physical representation of format and/or source (kafka the only source that has a meaningful key, all other sources like pulsar/kinesis/pravega use key only as a partitioning hash and support string based keys only). Moreover it makes it harder to reason about when implementing CREATE TABLE … AS SELECT …
For CREATE TABLE … AS SELECT … there is no place to put the modifiers, as the schema is derived from the query.
- cumbersome or impossible to handle columns that are stored in multiple places (both in key and value)
Option 4
Specific per property functions
CREATE TABLE kafka_table ( |
I think option 1 is more appealing as it is more future proof. It makes it easier to add new properties in the future.
Option 3:
Similar to option 1, but instead of using the computed columns we could introduce a special syntax. This would prohibit using Con: Would add too many built-in functions.
Future Work
Some topics that are not part of this FLIP anymore.
...