Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

KTable source is a kind of changelog source. The primary key semantics on changelog source means the materialized changelogs (INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) are unique on the primary key constraints. Flink assumes all messages are in order on the primary key and will use the primary key for further materialization and optimization.

Creating a KTable table in Flink requires declaring the primary key on the table. The primary key definition also controls which fields should end up in Kafka’s key. Therefore, we don’t need the ‘key.fields’ option in KTable connector. By default, primary key fields will also be stored in Kafka’s value as well. But users can still use option ‘value.fields-include’ to control this behavior.

...

Generally speaking, the underlying topic of KTable source MUST be compacted. But users can still be able to create a KTable source on a non-compacted topicmust be compacted. Besides, the underlying topic must have all the data with the same key in the same partition, otherwise, the result will be wrong.

...

In order to guarantee the message ordering, the KTable sink will always work in HASH partitioner mode on the primary key fields. Therefore, we don’t need the ‘sinksink.partitioner’ partitioneroption in KTable connector. 

...

Option

Required

Default

Type

Description

connector

required

(none)

String

Specify what connector to use, here should be 'ktable'.

topic

required

(none)

String

Topic name to read data from when the table is used as source or to write data to when the table is used as sink.

properties.bootstrap.servers

required

(none)

String

Comma separated list of Kafka brokers.

key.format

required

(none)

String

The format used to deserialize and serialize the key of Kafka record. Only insert-only format is supported.

value.format

required

(none)

String

The format used to deserialize and serialize the value of Kafka records.

value.fields-include

optional

'ALL'

String

Controls which fields should end up in the value as well, possible values 

-   ALL (all fields of the schema, even if they are part of e.g. the key)

-   EXCEPT_KEY (all fields of the schema - fields of the key)

 

sink.semantic

optional

at-least-once

String

Defines the delivery semantic for the Kafka sink. Valid enumerations are 'at-least-once', 'exactly-once' and 'none'. 

Note: only insert-only format is supported to be used as 'key.format' and 'value.format'. We will use the ChangelogMode of the format to distinguish whether the format is insert-only.

Examples

Convert a Kafka topic with debezium format into the KTable Table

...

  1. It's hard to explain what's the behavior when users specify the start offset from a middle position (e.g. how to process non exist delete events). It's dangerous if users do that. So we don't provide the offset option in the new connector at the moment.
  2. It's a different perspective/abstraction on the same kafka topic (append vs. upsert). It would be easier to understand if we can separate them instead of mixing them in one connector. The new connector requires hash sink partitioner, primary key declared, regular format. If we mix them in one connector, it might be confusing how to use the options correctly.
  3. The semantic of the KTable connector is just the same as KTable in Kafka Stream. So it's very handy for Kafka Stream and KSQL users. We have seen several questions [1][2] in the mailing list asking how to model a KTable and how to join a KTable in Flink SQL.

Future Work

Support bounded KTable

...

source

It’s also possible that users want to use KTable in batch mode. But we need more discuission about this feature.

...