Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  col1 STRING,
  col2 STRING
) WITH (
  'connector.type' = 'kafka',
  'connector.topic' = 'test-topic'

  'key.format.type' = 'csv',
  'key.fields' = 'id, name',

  'value.format.type' = 'avro',
  'value.fields.-include' = 'EXCEPT_KEY'
)


An insert statement:

...

  • key.fields, key.format.type, key.format.(<properties-required-by-format>) - this controls which fields should end up in Kafka’s key and what should be the serialization format
  • value.fields.-include,
    • value.fields.-include - controls which fields should end up in the value as well, possible values 
      • ALL (all fields of the schema, even if they are part of e.g. the key)
      • EXCEPT_KEY (all fields of the schema - fields of the key)
      • EXCEPT_KEY_TIMESTAMP (all fields of the schema - fields of the key - field of timestamp)
    • value.format.type, value.format.(<properties-required-by-format>) - I suggest having the “value” prefix optional, this way we could be backwards compatible with previous declarations

...