Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Kafka + Canal JSON Format: Both connector and format expose metadata

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  offset AS CAST(SYSTEM_METADATA("offset") AS INT), -- from Kafka
  database AS CAST(SYSTEM_METADATA("database") AS STRING) -- from Canal
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic', 

  'format' = 'canal-json'
)

Using the following format:

{
    "data": [
        {
            "id": "102",
            "name": "car battery"
        }
    ],
    "database": "inventory”,
    "table": "products",
    "es": 1589374013000,
    "ts": 1589374013680,
    "type": "DELETE"
}

Kafka: Read metadata from Kafka's ConsumerRecord and use it for computation

...