Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Kafka: ETL: read, transform and write back with key, value. All fields of the key are present in the value as well.


CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  col1 STRING,
  col2 STRING
) WITH (
  'connector.type' = 'kafka',
  'connector.topic' = 'test-topic',


  'key.fields' = 'id, name',
  'key.format.type' = 'csv',

  'value.format.type' = 'avro'
)



An insert statement:

INSERT INTO kafka_table VALUES(
  (1, "ABC", "col1", "col2")
)

will result in a ProducerRecord as follows:

new ProducerRecord<>(
  "test-topic",
  null,
  CSV.serialize(id, name),
  AVRO.serialize(id, name, col1, col2)
)


A slightly different use case is to manually control where to store the columns data:

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  col1 STRING,
  col2 STRING
) WITH (
  'connector.type' = 'kafka',
  'connector.topic' = 'test-topic'

  'key.format.type' = 'csv',
  'key.fields' = 'id, name',

  'value.format.type' = 'avro',
  'value.fields.include' = 'EXCEPT_KEY'
)


An insert statement:

INSERT INTO kafka_table VALUES(
  (1, "ABC", "col1", "col2")
)

will result in a ProducerRecord as follows:

new ProducerRecord<>(
  "test-topic",
  null,
  CSV.serialize(id, name),
  AVRO.serialize(col1, col2)
)



  1. Kafka: Generate Kafka’s timestamp (it does not have to be a time attribute)


CREATE TABLE csv_table (
  id BIGINT,
  eventType STRING,
  timestamp STRING
) WITH (
  'connector.type' = 'filesystem',
  'format.type' = 'csv'
)

CREATE TABLE kafka_table (
  id BIGINT,
  eventType STRING,
  timestamp TIMESTAMP(3)
) WITH (
  'connector.type' = 'kafka',
  'value.format.type' = 'avro',

  'timestamp' = 'timestamp'
)

INSERT INTO kafka_table(id, eventType, timestamp)
  SELECT id, eventType, toTimestamp(timestamp) FROM csv_table;


  1. Access read-only metadata e.g. partition

...

  • Kafka:  represents partitions ids as longs


CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  partitionId BIGINT AS SYSTEM_METADATA("partition")
) WITH (
  'connector.type' = 'kafka',
  'value.format.type' = 'avro'
)

SELECT * FROM kafka_table;

-- Partition is a computed column, therefore it cannot be written to. Statement like below will fail:
INSERT INTO kafka_table VALUES (1, "ABC", 12)


  • Kinesis: kinesis represents partitions keys as strings


CREATE TABLE kinesis_table (
  id BIGINT,
  name STRING,
  partitionId STRING AS SYSTEM_METADATA("partition")
) WITH (
  'connector.type' = 'kinesis',
  'value.format.type' = 'avro'
)

SELECT * FROM kinesis_table;

-- Partition is a computed column, therefore it cannot be written to. Statements like below will fail:
INSERT INTO kinesis_table VALUES (1, "ABC", "shard-0000")



  1. Kafka: Push down offset filter


CREATE TABLE kafka_table (
  id BIGINT,
  offset BIGINT AS SYSTEM_METADATA("offset")
) WITH (
  'connector.type' = 'kafka',
  'value.format.type' = 'avro'
)

SELECT * FROM kafka_table WHERE offset > 123456;


  1. Kinesis: Specify custom partitioning


CREATE TABLE kinesis_table (
  id BIGINT,
  name STRING)
PARTITIONED BY HASH(id)
WITH (
  'connector.type' = 'kinesis',
  'value.format.type' = 'avro'
)

INSERT INTO kinesis_table VALUES(
    (1, 'ABC', 1),
    (1, 'ABC', 3)
)


Both records will end up in the same partition.

  1. Kafka: Manually specify target partition, overwrite partitioning by key.


CREATE TABLE kinses_table (
  id BIGINT,
  name STRING,
  partitionId BIGINT)
PARTITIONED BY HASH(partitionId)
WITH (
  'connector.type' = 'kafka',
  'key.fields' = 'id, name',
  'key.format.type' = 'csv',

  'value.format.type' = 'avro'
)

INSERT INTO kafka_table VALUES(
    (1, 'ABC', 1),
    (1, 'ABC', 3)
)



The two records end up in different partitions even though the resulting Kafka’s keys will be equal for both records.

...

If a user has an old DDL statement that deserialized fields only from the value, but wants to also read data from the key. 

CREATE TABLE kafka_table (...) WITH (
  'connector.type' = 'kafka',


  'format.type' = 'avro',
  'format.(...)' = '...',
)

In that case a user needs to add only the description of the key:

CREATE TABLE kafka_table (...) WITH (
  'connector.type' = 'kafka',
  'format.type' = 'avro',
  'format.(...)' = '...',

  'key.fields' = '...',
  'key.format.type' = 'csv'
)

   

  • timestamp.field - property which tells which field to store as Kafka’s timestamp
  • (optional support) fields.verify-integrity - controls if we should perform an equality check if a field is contained in different parts of consumer record


Summary:

CREATE TABLE kafka_table (
    id STRING,
    timestamp TIMESTAMP,
    col1 ...,
    col2 ...
    ....
) WITH (
  'connector.type' = 'kafka',
  ...
  'key.fields' = 'id, col1'
  'key.format.type' = 'csv'

  -- optional: controls which fields of the schema should end up in the value (ALL by default) (options: ALL, EXCEPT_KEY, EXCEPT_KEY_TIMESTAMP)
  'value.fields-include' = 'ALL'  
  'value.format.type' = 'avro'

  --optional: (false by default) if true and a field originates from more than a single location (e.g. both in key and value), a check will be performed both values are equal
  'fields.verify-integrity' = 'false'      

  'timestamp.field' = 'timestamp'
)


Rejected alternatives:

Marking columns in the schema section:

CREATE TABLE kafka_table (
  id STRING KEY,
  timestamp TIMESTAMP HEADER("timestamp"),
  col1 ... KEY/HEADER("key"),
  col2 ...
  ...
) WITH (
  'connector.type' = 'kafka',
  ...
  'format.type' = 'kafka-format'
  'format.key.type' = 'csv'
  'format.value.type' = 'avro'
)


There is a couple of issues with this approach:

...

Generic SYSTEM_METADATA(“property-key”)

CREATE TABLE kafka_table (
  id STRING,
  offset BIGINT AS SYSTEM_METADATA("offset"),
  partitionId BIGINT AS SYSTEM_METADATA("partition")
  ...
) WITH (
  'connector.type' = 'kafka'
)

How to derive type of a column

...

Specific per property functions

CREATE TABLE kafka_table (
  id STRING,
  offset AS SOURCE_OFFSET(),
  partitionId AS SOURCE_PARTITION()
  ...
) WITH (
  'connector.type' = 'kafka'
)


I think option 1 is more appealing as it is more future proof. It makes it easier to add new properties in the future.

...

Use PARTITIONED BY clause with different partitioning strategies (similar to ORACLE):

Partition by hash:

CREATE TABLE kafka_table (
  id STRING,
  name STRING,
  date: DATE,
  key: BINARY
  ...
) WITH (
  'connector.type' = 'kafka'
  'key.fields' = 'key' --the key will not be used for partitioning
)
PARTITIONED BY HASH(id, name)




If necessary we can introduce more partitioning strategies E.g. explicit partitioning:

-- partitioning based on an existing column in data
CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  date: DATE,
  key: BINARY
  ...
) WITH (
  'connector.type' = 'kafka'
  'key.fields' = 'key' -- the key will not be used for partitioning
)
PARTITIONED BY EXPLICIT(id)

-- or on a computed column
CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  targetPartition BIGINT AS CAST(name AS BIGINT),
  date: DATE,
  key: BINARY
  ...
) WITH (
  'connector.type' = 'kafka'
  'key.fields' = 'key' -- the key will not be used for partitioning
)
PARTITIONED BY EXPLICIT(targetPartition)

-- Or with user defined functions in the partitioned by clause
CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  date: DATE,
  key: BINARY
  ...
) WITH (
  'connector.type' = 'kafka'
  'key.fields' = 'key' -- the key will not be used for partitioning
)
PARTITIONED BY EXPLICIT(USER_SCALAR_FUNCTION(name, date, SYSTEM_METADATA("numberOfPartitions"))

Relation to FLIP-63

Flip-63 introduced initial support for PARTITIONED BY clause to an extent that let us support Hive's partitioning. In majority of situations it is in line with the proposal above.

The only significant difference is that this proposal suggests adding partitioning strategies e.g. PARTITIONED BY HASH/EXPLICIT.  Currently implemented strategy works as an EXPLICIT strategy that requires a list of any type as a partition identifier. We could make the EXPLICIT strategy default, which could let us support current PARTITION BY syntax.

The current implementation requires the column in PARTITIONED BY clause to be also defined in the schema part of the table. It is the same as in this proposal.

FLIP-63 discusses syntax as INSERT INTO ... PARTITION(...) → It does not interfere with this proposal. It just implies additional check that the row matches the partition requirements(columns in rows match the partition specification)

FLIP-63 discusses more in-depth filesystem specific partition pruning. This proposal leaves out that part to specific source implementations. Therefore it does not contradict with FLIP-63.