Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Primary-Key Constraints on the KTable

FLIP-87 proposed the definition of primary key that primary key constraints tell a column or a set of columns of a table or a view are unique and they do not contain null. Neither of the columns in a primary can be nullable. Primary key therefore uniquely identifies a row in a table. However, Flink does not own the data therefore we only support NOT ENFORCED mode. It means it up to the user to ensure the key integrity, Flink will simply trust correctness of the primary key.

KTable source is a kind of changelog source, FLIP-132 also gives a clear definition for primary keys on changelog source which is also aligned with FLIP-87. The primary key semantics on changelog source means the materialized changelogs (INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) are unique on the primary key constraints. Flink assumes all messages are in order on the primary key and will use the primary key for further materialization (e.g. generate UPDATE_BEFORE for KTable source) and optimization.

When the KTable connector is used as a sink, it works similar to the existing HBase sink. KTable sink doesn’t require planner to send UPDATE_BEFORE messages (planner may still send UPDATE_BEFORE messages in some cases), and will write INSERT/UPDATE_AFTER messages as normal Kafka records with key parts, and will write DELETE messages as Kafka records with null values (indicate tombstone for the key).  Flink will guarantee the message ordering on the primary key by partition data on the values of the primary key columns.

KTable source is a kind of changelog source. The primary key semantics on changelog source means the materialized changelogs (INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) are unique on the primary key constraints. Flink assumes all messages are in order on the primary key and will use the primary key for further materialization and optimizationWhen the KTable connector is used as a sink, it works similar to the existing HBase sink. KTable sink doesn’t require planner to send UPDATE_BEFORE messages (planner may still send UPDATE_BEFORE messages in some cases), and will write INSERT/UPDATE_AFTER messages as normal Kafka records with key parts, and will write DELETE messages as Kafka records with null values (indicate tombstone for the key).  Flink will guarantee the message ordering on the primary key by partition data on the values of the primary key columns.

Creating a KTable table in Flink requires declaring the primary key on the table. The primary key definition also controls which fields should end up in Kafka’s key. Therefore, we don’t need the ‘key.fields’ option in KTable connector. By default, primary key fields will also be stored in Kafka’s value as well. But users can still use option ‘value.fields-include’ to control this behavior.

KTable Source

Generally speaking, the underlying topic of KTable source should MUST be compacted. But users can still be able to create a KTable source on a non-compacted topic. If users enable “delete” cleanup policy, that means Kafka will drop data, then Flink can’t guarantee correctness as expected. Besides, the underlying topic must have all the data with the same key in the same partition, otherwise, the result will be wrong.

...

Option

Required

Default

Type

Description

connector

required

(none)

String

Specify what connector to use, here should be 'ktable'.

topic

required

(none)

String

Topic name to read data from when the table is used as source or to write data to when the table is used as sink.

properties.bootstrap.servers

required

(none)

String

Comma separated list of Kafka brokers.

key.format

required

(none)

String

The format used to deserialize and serialize the key of Kafka record.

value.format

required

(none)

String

The format used to deserialize and serialize the value of Kafka records.

value.fields-include

optional

'ALL'

String

Controls which fields should end up in the value as well, possible values 

-   ALL (all fields of the schema, even if they are part of e.g. the key)

-   EXCEPT_KEY (all fields of the schema - fields of the key)

 

fields.verify-integrityinclude

optionalfalse

'ALL'

BooleanString

Controls if we should perform an equality check if a field is contained in different parts of consumer record (either in key or value). If true and a field originates from more than a single location (e.g. both in key and value), a check will be performed to validate both values are equal.which fields should end up in the value as well, possible values 

-   ALL (all fields of the schema, even if they are part of e.g. the key)

-   EXCEPT_KEY (all fields of the schema - fields of the key)

 

sink.semantic

optional

at-least-once

String

Defines the delivery semantic for the Kafka sink. Valid enumerations are 'at-least-once', 'exactly-once' and 'none'. 

...

Code Block
languagesql
-- register a kafka source which interpret debezium topic as a changelog stream
CREATE TABLE dbz_users (
  user_id BIGINT,
  user_name STRING,
  region STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'dbz_users',
  'properties.bootstrap.servers' = '...',
  'format' = 'debezium-json'
);
-- changelog mode will not materilizematerialize results and display records with op flag.
> SET execution.result-mode=changelog;
> SELECT * FROM dbz_users;
+----+---------+-----------+----------+
| op | user_id | user_name |   region |
+----+---------+-----------+----------+
| +I | 100     |  Bob      | Beijing  |
| +I | 101     |  Alice    | Shanghai |
| +I | 102     |  Greg     | Berlin   |
| +I | 103     |  Richard  | Berlin   |
| -U | 101     |  Alice    | Shanghai |
| +U | 101     |  Alice    | Hangzhou |
| -D | 103     |  Richard  | Berlin   |
+----+---------+-----------+----------+

-- register a ktable sink which will be used for storing latest users information
CREATE TABLE users (
  user_id BIGINT,
  user_name STRING,
  user_level STRING,
  region STRING,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'ktable',
  'topic' = 'users',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'csv',
  'value.format' = 'avro'
);

-- convert the debezium topic into kafka compacted topic
INSERT INTO users SELECT * FROM dbz_users;

-- table mode will materialize resultschangelog and onlyrefresh displaythe final resultresults
> SET execution.result-mode=table;
> SELECT * FROM users;

+---------+-----------+----------+
| user_id | user_name |   region |
+---------+-----------+----------+
| 100     |  Bob      | Beijing  |
| 101     |  Alice    | Hangzhou |
| 102     |  Greg     | Berlin   |
+---------+-----------+----------+

...

Code Block
languagesql
CREATE TABLE pageviews (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews',
  'properties.bootstrap.servers' = '...',
  'format' = 'avro'
);

> SET execution.result-mode=changelog;
> SELECT * FROM pageviews;

+----+---------+-----------+----------------------+----------+
| op | user_id |   page_id |             viewtime | proctime |
+----+---------+-----------+----------------------+----------+
| +I | 100     |  10001    | 2020-10-01 08:01:00  | ........ |
| +I | 102     |  10002    | 2020-10-01 08:02:00  | ........ |
| +I | 101     |  10002    | 2020-10-01 08:04:00  | ........ |
| +I | 102     |  10004    | 2020-10-01 08:06:00  | ........ |
| +I | 102     |  10003    | 2020-10-01 08:07:00  | ........ |
+----+---------+-----------+----------------------+----------+


CREATE TABLE pageviews_enriched (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  user_region STRING,
  WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews_enriched',
  ...
);

-- insert-only stream temporal join a changelog stream which will be -- supported by FLIP-132
INSERT INTO pageviews_enriched
SELECT * 
FROM pageviews AS p
LEFT JOIN users FOR SYSTEM_TIME AS OF p.proctime AS u
ON p.user_id = u.user_id;

> SET execution.result-mode=changelog;
> SELECT * pageviews_enriched;

+----+---------+-----------+----------------------+-------------+
| op | user_id |   page_id |             viewtime | user_region |
+----+---------+-----------+----------------------+-------------+
| +I | 100     |  10001    | 2020-10-01 08:01:00  |    Beijing  |     
| +I | 102     |  10002    | 2020-10-01 08:02:00  |    Berlin   |    
| +I | 101     |  10002    | 2020-10-01 08:04:00  |    Hangzhou |      
| +I | 102     |  10004    | 2020-10-01 08:06:00  |    Berlin   |    
| +I | 102     |  10003    | 2020-10-01 08:07:00  |    Berlin   |  
+----+---------+-----------+----------------------+-------------+

...

  1. It's hard to explain what's the behavior when users specify the start offset from a middle position (e.g. how to process non exist delete events). It's dangerous if users do that. So we don't provide the offset option in the new connector at the moment.
  2. It's a different perspective/abstraction on the same kafka topic (append vs. upsert). It would be easier to understand if we can separate them instead of mixing them in one connector. The new connector requires hash sink partitioner, primary key declared, regular format. If we mix them in one connector, it might be confusing how to use the options correctly.
  3. The semantic of the KTable connector is just the same as KTable in Kafka Stream. So it's very handy for Kafka Stream and KSQL users. We have seen several questions [1][2] in the mailing list asking how to model a KTable and how to join a KTable in Flink SQL.

Future Work

Support bounded KTable connector

It’s also possible that users want to use KTable in batch mode.

References

[1]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-materialized-upsert-tables-td18482.html#a18503

...