Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Table of Contents

Motivation

Through mailing lists and community issues, many users have already expressed their needs for the upsert Kafka.

...

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Through mailing lists and community issues, many users have already expressed their needs for the upsert Kafka. After looking through the mailing lists, we think there are 3 reasons behind:

  • Interpret the compacted kafka topic (or KTable notion in Kafka) as a changelog stream that interpret records with keys as upsert events [1-3];  
  • As a part of the real time pipeline, join multiple streams for enrichment and store results into a Kafka topic for further calculation later. However, the result may contain update events.
  • As a part of the real time pipeline, aggregate on data streams and store results into a Kafka topic for further calculation later. However, the result may contain update events.

Public Interface

...

Upsert-kafka Connector Definition

KTable Upsert-kafka connector produces a changelog stream, where each data record represents an update or delete event. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key, if any (if a corresponding key doesn’t exist yet, the update will be considered an INSERT). Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null values are interpreted in a special way: a record with a null value represents a “DELETE”.

Primary-Key Constraints on the

...

upsert-kafka

When the KTable upsert-kafka connector is used as a sink, it works similar to the existing HBase sink. KTable Upsert-kafka sink doesn’t require planner to send UPDATE_BEFORE messages (planner may still send UPDATE_BEFORE messages in some cases), and will write INSERT/UPDATE_AFTER messages as normal Kafka records with key parts, and will write DELETE messages as Kafka records with null values (indicate tombstone for the key).  Flink will guarantee the message ordering on the primary key by partition data on the values of the primary key columns.

KTable Upsert-kafka source is a kind of changelog source. The primary key semantics on changelog source means the materialized changelogs (INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) are unique on the primary key constraints. Flink assumes all messages are in order on the primary key and will use the primary key for further materialization and optimization.

Creating a KTable an upsert-kafka table in Flink requires declaring the primary key on the table. The primary key definition also controls which fields should end up in Kafka’s key. Therefore, we don’t need the ‘key.fields’ option in KTable upsert-kafka connector. By default, primary key fields will also be stored in Kafka’s value as well. But users can still use option ‘value.fields-include’ to control this behavior.KTable

Upsert-kafka Source

Generally speaking, the underlying topic of KTable source MUST be compacted. But users can still be able to create a KTable source on a non-compacted topicthe upsert-kafka source must be compacted. Besides, the underlying topic must have all the data with the same key in the same partition, otherwise, the result will be wrong.

Currently, KTable connector doesn’t provide start reading position options. KTable must be read from the earliest offset. This is a protection for data integrity, otherwise it’s hard to explain what’s the behavior when users specify the start offset from a middle position and how to process delete events whose keys have never been seen. Therefore, KTable connector doesn’t provide options like 'scan.startup.mode', 'scan.startup.specific-offsets', 'scan.startup.timestamp-millis' and 'properties.group.id' (only used for 'group-offsets' startup mode).

KTable Sink

In order to guarantee the message ordering, the KTable sink will always work in HASH partitioner mode on the primary key fields. Therefore, we don’t need the ‘sink.partitioner’ option in KTable connector. 

KTable Connector Options

The options in KTable Connector are much like Kafka Connector.

be wrong.

Currently, the upsert-kafka connector doesn’t provide start reading position options. The upsert-kafka connector must be read from the earliest offset. This is a protection for data integrity, otherwise it’s hard to explain what’s the behavior when users specify the start offset from a middle position and how to process delete events whose keys have never been seen. Therefore, the upsert-kafka connector doesn’t provide options like 'scan.startup.mode', 'scan.startup.specific-offsets', 'scan.startup.timestamp-millis' and 'properties.group.id' (only used for 'group-offsets' startup mode).

Upsert-kafka Sink

In order to guarantee the message ordering, the upsert-kafka sink will always work in HASH partitioner mode on the primary key fields. Therefore, we don’t need the ‘sink.partitioner’ option in the upsert-kafka connector. 

Upsert-kafka Connector Options

The options in upsert-kafka Connector are much like Kafka Connector.

Defines the delivery semantic for the Kafka sink. Valid enumerations are 'at-least-once', 'exactly-once' and 'none'. 

Option

Required

Default

Type

Description

connector

required

(none)

String

Specify what connector to use, here should be 'upsert-kafka'.

properties.bootstrap.servers

required

(none)

String

Comma separated list of Kafka brokers.

key.format

required

(none)

String

The format used to deserialize and serialize the key of Kafka record. Only insert-only format is supported.

value.format

Option

Required

Default

Type

Description

connector

required

(none)

StringSpecify what connector to use, here should be 'ktable'

The format used to deserialize and serialize the value of Kafka records.

topic

requiredoptional

(none)

String

Topic name(s) to read data from when the table is used as source or to write data to when the table is used as sink.

properties.bootstrap.servers

required

(none)

String

Comma separated list of Kafka brokers.

 Note, only one of "topic-pattern" and "topic" can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks.

topic-patternoptional

key.format

required(none)StringThe format used to deserialize and serialize the key of Kafka record.

value.format

required

(none)

String

The format used to deserialize and serialize the value of Kafka recordsregular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of "topic-pattern" and "topic" can be specified for sources.

value.fields-include

optional

'ALL'

String

Controls which fields should end up in the value as well, possible values 

-   ALL (all fields of the schema, even if they are part of e.g. the key)

-   EXCEPT_KEY (all fields of the schema - fields of the key)

 

sink.semantic

optional

at-least-once

String

Note: only insert-only format is supported to be used as 'key.format' and 'value.format'. We will use the ChangelogMode of the format to distinguish whether the format is insert-only.

Examples

Convert a Kafka topic with debezium format into the

...

upsert-kafka Table

Code Block
languagesql
-- register a kafka source which interpret debezium topic as a changelog stream
CREATE TABLE dbz_users (
  user_id BIGINT,
  user_name STRING,
  region STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'dbz_users',
  'properties.bootstrap.servers' = '...',
  'format' = 'debezium-json'
);
-- changelog mode will not materialize results and display records with op flag.
> SET execution.result-mode=changelog;
> SELECT * FROM dbz_users;
+----+---------+-----------+----------+
| op | user_id | user_name |   region |
+----+---------+-----------+----------+
| +I | 100     |  Bob      | Beijing  |
| +I | 101     |  Alice    | Shanghai |
| +I | 102     |  Greg     | Berlin   |
| +I | 103     |  Richard  | Berlin   |
| -U | 101     |  Alice    | Shanghai |
| +U | 101     |  Alice    | Hangzhou |
| -D | 103     |  Richard  | Berlin   |
+----+---------+-----------+----------+

-- register aan ktableupsert-kafka sink which will be used for storing latest users information
CREATE TABLE users (
  user_id BIGINT,
  user_name STRING,
  user_level STRING,
  region STRING,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'ktableupsert-kafka',
  'topic' = 'users',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'csv',
  'value.format' = 'avro'
);

-- convert the debezium topic into kafka compacted topic
INSERT INTO users SELECT * FROM dbz_users;

-- table mode will materialize changelog and refresh the final results
> SET execution.result-mode=table;
> SELECT * FROM users;

+---------+-----------+----------+
| user_id | user_name |   region |
+---------+-----------+----------+
| 100     |  Bob      | Beijing  |
| 101     |  Alice    | Hangzhou |
| 102     |  Greg     | Berlin   |
+---------+-----------+----------+

Use

...

the upsert-kafka as a reference/dimension table

Code Block
languagesql
CREATE TABLE pageviews (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews',
  'properties.bootstrap.servers' = '...',
  'format' = 'avro'
);

> SET execution.result-mode=changelog;
> SELECT * FROM pageviews;

+----+---------+-----------+----------------------+----------+
| op | user_id |   page_id |             viewtime | proctime |
+----+---------+-----------+----------------------+----------+
| +I | 100     |  10001    | 2020-10-01 08:01:00  | ........ |
| +I | 102     |  10002    | 2020-10-01 08:02:00  | ........ |
| +I | 101     |  10002    | 2020-10-01 08:04:00  | ........ |
| +I | 102     |  10004    | 2020-10-01 08:06:00  | ........ |
| +I | 102     |  10003    | 2020-10-01 08:07:00  | ........ |
+----+---------+-----------+----------------------+----------+


CREATE TABLE pageviews_enriched (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  user_region STRING,
  WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews_enriched',
  ...
);

-- insert-only stream temporal join a changelog stream which will be supported by FLIP-132
INSERT INTO pageviews_enriched
SELECT * 
FROM pageviews AS p
LEFT JOIN users FOR SYSTEM_TIME AS OF p.proctime AS u
ON p.user_id = u.user_id;

> SET execution.result-mode=changelog;
> SELECT * pageviews_enriched;

+----+---------+-----------+----------------------+-------------+
| op | user_id |   page_id |             viewtime | user_region |
+----+---------+-----------+----------------------+-------------+
| +I | 100     |  10001    | 2020-10-01 08:01:00  |    Beijing  |     
| +I | 102     |  10002    | 2020-10-01 08:02:00  |    Berlin   |    
| +I | 101     |  10002    | 2020-10-01 08:04:00  |    Hangzhou |      
| +I | 102     |  10004    | 2020-10-01 08:06:00  |    Berlin   |    
| +I | 102     |  10003    | 2020-10-01 08:07:00  |    Berlin   |  
+----+---------+-----------+----------------------+-------------+

Write aggregate results into

...

the upsert-kafka

Code Block
languagesql
CREATE TABLE pageviews_per_region_5min (
  window_start STRING,
  region STRING,
  view_count BIGINT,
  PRIMARY KEY (window_start, region) NOT ENFORCED
) WITH (
  'connector' = 'ktableupsert-kafka',
  'topic' = 'pageviews_per_region_5min',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'csv',
  'value.format' = 'avro'
);

INSERT INTO pageviews_per_region_5min
SELECT 
  TUMBLE_START(viewtime, INTERVAL '5' MINUTE), 
  region,
  COUNT(*)
FROM pageviews_enriched
GROUP BY region, TUMBLE(viewtime, INTERVAL '5' MINUTE);

> SET execution.result-mode=table;
> SELECT * pageviews_per_region_5min;

+----------------------+-----------+-------------+
|         window_start |    region |  view_count |
+----------------------+-----------+-------------+
| 2020-10-01 08:00:00  |  Beijing  |          1  |
| 2020-10-01 08:00:00  |  Berlin   |          1  |
| 2020-10-01 08:00:00  |  Hangzhou |          1  |
| 2020-10-01 08:05:00  |  Berlin   |          2  |
+----------------------+-----------+-------------+

Aggregate on

...

the upsert-kafka connector

Code Block
languagesql
CREATE TABLE pageviews_per_region (
  region STRING,
  view_count BIGINT,
  PRIMARY KEY (region) NOT ENFORCED
) WITH (
  'connector' = 'ktableupsert-kafka',
  'topic' = 'pageviews_per_region',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'csv',
  'value.format' = 'avro'
);

INSERT INTO pageviews_per_region
SELECT 
  region,
  SUM(view_count)
FROM pageviews_per_region_5min
GROUP BY region;
> SET execution.result-mode=table;
> SELECT * pageviews_per_region;

+-----------+-------------+
|    region |  view_count |
+-----------+-------------+
|  Beijing  |          1  |
|  Berlin   |          3  |
|  Hangzhou |          1  |
+-----------+-------------+

Implementation Details

Due to the KTable connector upsert-kafka connector only produces upsert stream which doesn’t contain UPDATE_BEFORE messages. However, several operations require the UPDATE_BEFORE messages for correctly processing, e.g. aggregations. Therefore, we need to have a physical node to materialize the upsert stream and generate changelog stream with full change messages. In the physical operator, we will use state to know whether the key is the first time to be seen. The operator will produce INSERT rows, or additionally generate UPDATE_BEFORE rows for the previous image, or produce DELETE rows with all columns filled with values.

...

Considering the similarity between Kafka connector and KTable connectorthe upsert-kafka connector, we should reuse most of the code under the hood and just introduce a different connector factory.

Compatibility, Deprecation, and Migration Plan

This change introduces a new feature that does not imply any compatibility concerns.

Rejected Alternatives

Introduce a new property in Kafka connector vs Introduce

...

an upsert-kafka connector

  1. It's hard to explain what's the behavior when users specify the start offset from a middle position (e.g. how to process non exist delete events). It's dangerous if users do that. So we don't provide the offset option in the new connector at the moment.
  2. It's a different perspective/abstraction on the same kafka topic (append vs. upsert). It would be easier to understand if we can separate them instead of mixing them in one connector. The new connector requires hash sink partitioner, primary key declared, regular format. If we mix them in one connector, it might be confusing how to use the options correctly.
  3. The semantic of the KTable upsert-kafka connector is just the same as KTable in Kafka Stream. So it's very handy for Kafka Stream and KSQL users. We have seen several questions [1][2] in the mailing list asking how to model a KTable and how to join a KTable in Flink SQL] in the mailing list asking how to model a KTable and how to join a KTable in Flink SQL.

Use upsert-kafka as the new connector name vs Use kafka-compacted as the name vs Use ktable as the name

Considering the KTable has more implicit meaning than expected and the compacted meaning in the kafka-compacted is more related to topic rather than table itself, it's more suitable to  use the upsert-kafka as the name of the new connector that is much straightforward.

Future Work

Support bounded

...

upsert-kafka source

It’s also possible that users want to use KTable upsert-kafka connector in batch mode. But we need more discuission about this feature.

References

[1]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-materialized-upsert-tables-td18482.html#a18503

...