Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

CREATE TABLE kinesis_table (
  id BIGINT,
  name STRING,
  partitionId AS CAST(SYSTEM_METADATA("partition") AS STRING)
) WITH (
  'connector' = 'kinesis',
  'value.format' = 'avro'
)

SELECT * FROM kinesis_table;

-- Partition is a computed column, therefore it cannot be written to. Statements like below will fail:
INSERT INTO kinesis_table VALUES (1, "ABC", "shard-0000")

Kafka + Canal JSON Format: Both connector and format expose metadata

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  offset AS CAST(SYSTEM_METADATA("offset") AS INT), -- from Kafka
  database AS CAST(SYSTEM_METADATA("database") AS STRING) -- from Canal
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic', 

  'format' = 'canal-json'
)

Kafka: Read metadata from Kafka's ConsumerRecord and use it for computation

...