...
KTable source is a kind of changelog source. The primary key semantics on changelog source means the materialized changelogs (INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) are unique on the primary key constraints. Flink assumes all messages are in order on the primary key and will use the primary key for further materialization and optimization.
Creating a KTable table in Flink requires declaring the primary key on the table. The primary key definition also controls which fields should end up in Kafka’s key. Therefore, we don’t need the ‘key.fields’ option in KTable connector. By default, primary key fields will also be stored in Kafka’s value as well. But users can still use option ‘value.fields-include’ to control this behavior.
...
Generally speaking, the underlying topic of KTable source MUST be compacted. But users can still be able to create a KTable source on a non-compacted topicmust be compacted. Besides, the underlying topic must have all the data with the same key in the same partition, otherwise, the result will be wrong.
...
In order to guarantee the message ordering, the KTable sink will always work in HASH partitioner mode on the primary key fields. Therefore, we don’t need the ‘sink‘sink.
partitioner’ partitioner
’ option in KTable connector.
...
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | required | (none) | String | Specify what connector to use, here should be 'ktable'. |
topic | required | (none) | String | Topic name to read data from when the table is used as source or to write data to when the table is used as sink. |
properties.bootstrap.servers | required | (none) | String | Comma separated list of Kafka brokers. |
key.format | required | (none) | String | The format used to deserialize and serialize the key of Kafka record. Only insert-only format is supported. |
value.format | required | (none) | String | The format used to deserialize and serialize the value of Kafka records. |
value.fields-include | optional | 'ALL' | String | Controls which fields should end up in the value as well, possible values - ALL (all fields of the schema, even if they are part of e.g. the key) - EXCEPT_KEY (all fields of the schema - fields of the key)
|
sink.semantic | optional | at-least-once | String | Defines the delivery semantic for the Kafka sink. Valid enumerations are 'at-least-once', 'exactly-once' and 'none'. |
Note: only insert-only format is supported to be used as 'key.format'
and 'value.format
'. We will use the ChangelogMode
of the format to distinguish whether the format is insert-only.
Examples
Convert a Kafka topic with debezium format into the KTable Table
...
- It's hard to explain what's the behavior when users specify the start offset from a middle position (e.g. how to process non exist delete events). It's dangerous if users do that. So we don't provide the offset option in the new connector at the moment.
- It's a different perspective/abstraction on the same kafka topic (append vs. upsert). It would be easier to understand if we can separate them instead of mixing them in one connector. The new connector requires hash sink partitioner, primary key declared, regular format. If we mix them in one connector, it might be confusing how to use the options correctly.
- The semantic of the KTable connector is just the same as KTable in Kafka Stream. So it's very handy for Kafka Stream and KSQL users. We have seen several questions [1][2] in the mailing list asking how to model a KTable and how to join a KTable in Flink SQL.
Future Work
Support bounded KTable
...
source
It’s also possible that users want to use KTable in batch mode. But we need more discuission about this feature.
...