Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

...

Besides the main payload, the majority of sources (if not all of the sources) expose additional information. It can simply be simply a read-only metadata such as a Kafka read-offset , or ingestion time or a . Additionally, users might want to read and write only parts of the record that contain data but additionally serve different purposes (partitioning, compaction etc.)

...

Because of the reasons above Kafka will be used for a majority of the examples.

Examples

...

Kafka: ETL: read, transform and write back with key, value. All fields of the key are present in the value as well.

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  col1 STRING,
  col2 STRING
) WITH (
  'connector.type' = 'kafka',
  'connector.topic' = 'test-topic',


  'key.fields' = 'id, name',
  'key.format.type' = 'csv',

  'value.format.type' = 'avro'
)

...

new ProducerRecord<>(
  "test-topic",
  null,
  CSV.serialize(id, name),
  AVRO.serialize(col1, col2)
)

Kafka: Generate Kafka’s timestamp (it does not have to be a time attribute)

CREATE TABLE csv_table (
  id BIGINT,
  eventType STRING,
  timestamp STRING
) WITH (
  'connector.type' = 'filesystem',
  'format.type' = 'csv'
)

CREATE TABLE kafka_table (
  id BIGINT,
  eventType STRING,
  timestamp TIMESTAMP(3)
) WITH (
  'connector.type' = 'kafka',
  'value.format.type' = 'avro',

  'timestamp' = 'timestamp'
)

INSERT INTO kafka_table(id, eventType, timestamp)
  SELECT id, eventType, toTimestamp(timestamp) FROM csv_table;

Access read-only metadata e.g. partition

Kafka:  represents partitions ids as longs:

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  partitionId BIGINT AS SYSTEM_METADATA("partition")
) WITH (
  'connector.type' = 'kafka',
  'value.format.type' = 'avro'
)

SELECT * FROM kafka_table;

-- Partition is a computed column, therefore it cannot be written to. Statement like below will fail:
INSERT INTO kafka_table VALUES (1, "ABC", 12)


Kinesis: kinesis represents partitions keys as strings:

CREATE TABLE kinesis_table (
  id BIGINT,
  name STRING,
  partitionId STRING AS SYSTEM_METADATA("partition")
) WITH (
  'connector.type' = 'kinesis',
  'value.format.type' = 'avro'
)

SELECT * FROM kinesis_table;

-- Partition is a computed column, therefore it cannot be written to. Statements like below will fail:
INSERT INTO kinesis_table VALUES (1, "ABC", "shard-0000")

Kafka: Push down offset filter

CREATE TABLE kafka_table (
  id BIGINT,
  offset BIGINT AS SYSTEM_METADATA("offset")
) WITH (
  'connector.type' = 'kafka',
  'value.format.type' = 'avro'
)

SELECT * FROM kafka_table WHERE offset > 123456;

Kinesis: Specify custom partitioning

CREATE TABLE kinesis_table (
  id BIGINT,
  name STRING)
PARTITIONED BY HASH(id)
WITH (
  'connector.type' = 'kinesis',
  'value.format.type' = 'avro'
)

INSERT INTO kinesis_table VALUES(
    (1, 'ABC', 1),
    (1, 'ABC', 3)
)

Both records will end up in the same partition.

Kafka: Manually specify target partition, overwrite partitioning by key.

CREATE TABLE kinses_table (
  id BIGINT,
  name STRING,
  partitionId BIGINT)
PARTITIONED BY HASH(partitionId)
WITH (
  'connector.type' = 'kafka',
  'key.fields' = 'id, name',
  'key.format.type' = 'csv',

  'value.format.type' = 'avro'
)

INSERT INTO kafka_table VALUES(
    (1, 'ABC', 1),
    (1, 'ABC', 3)
)

The two records end up in different partitions even though the resulting Kafka’s keys will be equal for both records.

Details

...

Reading and writing from key, value, timestamp

...

I suggest introducing connector specific properties that allow specifying which fields should end up in which parts of the record. The reason why I am not suggesting having a key-value-metadata format is that there are most often differences across records of different systems, e.g.

  • kinesisKinesis, pravega Pravega do not have a key where users can store data
  • hbase HBase does not have a concept of timestamp

...

  • key.fields, key.format.type, key.format.(<properties-required-by-format>) - this controls which fields should end up in Kafka’s key and what should be the serialization format
  • value.fields-include,
  • value.fields-include -
  • this controls which fields should end up in the value as well, possible values 
    • ALL (all fields of the schema, even if they are part of e.g. the key)
    • EXCEPT_KEY (all fields of the schema - fields of the key)
    • EXCEPT_KEY_TIMESTAMP (all fields of the schema - fields of the key - field of timestamp)
  • value.format.type, value.format.(<properties-required-by-format>) - I suggest having the “value” prefix optional, this way we could be backwards compatible with previous declarations
  • timestamp.field - property which tells which field to store as Kafka’s timestamp
  • (optional support) fields.verify-integrity - controls if we should perform an equality check if a field is contained in different parts of consumer record

If a user has an old DDL statement that deserializes If a user has an old DDL statement that deserialized fields only from the value, but wants to also read data from the key. 

...

CREATE TABLE kafka_table (...) WITH (
  'connector.type' = 'kafka',
  'format.type' = 'avro',
  'format.(...)' = '...',

  'key.fields' = '...',
  'key.format.type' = 'csv'
)

...

  • timestamp.field - property which tells which field to store as Kafka’s timestamp
  • (optional support) fields.verify-integrity - controls if we should perform an equality check if a field is contained in different parts of consumer record

Summary

...

CREATE TABLE kafka_table (
    id STRING,
    timestamp TIMESTAMP,
    col1 ...,
    col2 ...
    ....
) WITH (
  'connector.type' = 'kafka',
  ...
  'key.fields' = 'id, col1'
  'key.format.type' = 'csv'

  -- optional: controls which fields of the schema should end up in the value (ALL by default) (options: ALL, EXCEPT_KEY, EXCEPT_KEY_TIMESTAMP)
  'value.fields-include' = 'ALL'  
  'value.format.type' = 'avro'

  --optional: (false by default) if true and a field originates from more than a single location (e.g. both in key and value), a check will be performed both values are equal
  'fields.verify-integrity' = 'false'      

  'timestamp.field' = 'timestamp'
)


Rejected alternatives

...

Marking columns in the schema section:

...

  • mixes logical schema definition with physical representation of format and/or source (kafka the only source that has a meaningful key, all other sources like pulsar/kinesis/pravega use key only as a partitioning hash and support string based keys only). Moreover it makes it harder to reason about when implementing CREATE TABLE … AS SELECT …

    For CREATE TABLE … AS SELECT … there is no place to put the modifiers, as the schema is derived from the query.

...