Status
...
Page properties | |
---|---|
|
...
...
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Table of Contents |
---|
Motivation
Through mailing lists and community issues, many users have already expressed their needs for the upsert Kafka.
...
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Table of Contents |
---|
Motivation
Through mailing lists and community issues, many users have already expressed their needs for the upsert Kafka. After looking through the mailing lists, we think there are 3 reasons behind:
- Interpret the compacted kafka topic (or KTable notion in Kafka) as a changelog stream that interpret records with keys as upsert events [1-3];
- As a part of the real time pipeline, join multiple streams for enrichment and store results into a Kafka topic for further calculation later. However, the result may contain update events.
- As a part of the real time pipeline, aggregate on data streams and store results into a Kafka topic for further calculation later. However, the result may contain update events.
Public Interface
...
Upsert-kafka Connector Definition
KTable Upsert-kafka connector produces a changelog stream, where each data record represents an update or delete event. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key, if any (if a corresponding key doesn’t exist yet, the update will be considered an INSERT). Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null values are interpreted in a special way: a record with a null value represents a “DELETE”.
Primary-Key Constraints on the
...
upsert-kafka
When the KTable upsert-kafka connector is used as a sink, it works similar to the existing HBase sink. KTable Upsert-kafka sink doesn’t require planner to send UPDATE_BEFORE messages (planner may still send UPDATE_BEFORE messages in some cases), and will write INSERT/UPDATE_AFTER messages as normal Kafka records with key parts, and will write DELETE messages as Kafka records with null values (indicate tombstone for the key). Flink will guarantee the message ordering on the primary key by partition data on the values of the primary key columns.
KTable Upsert-kafka source is a kind of changelog source. The primary key semantics on changelog source means the materialized changelogs (INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) are unique on the primary key constraints. Flink assumes all messages are in order on the primary key and will use the primary key for further materialization and optimization.
Creating a KTable an upsert-kafka table in Flink requires declaring the primary key on the table. The primary key definition also controls which fields should end up in Kafka’s key. Therefore, we don’t need the ‘key.fields’ option in KTable upsert-kafka connector. By default, primary key fields will also be stored in Kafka’s value as well. But users can still use option ‘value.fields-include’ to control this behavior.
...
Upsert-kafka Source
Generally speaking, the underlying topic of KTable source MUST be compacted. But users can still be able to create a KTable source on a non-compacted topicthe upsert-kafka source must be compacted. Besides, the underlying topic must have all the data with the same key in the same partition, otherwise, the result will be wrong.
Currently, KTable connector doesn’t provide start reading position options. KTable must be read from the earliest offset. This is a protection for data integrity, otherwise it’s hard to explain what’s the behavior when users specify the start offset from a middle position and how to process delete events whose keys have never been seen. Therefore, KTable connector doesn’t provide options like '
scan.startup.mode'
, 'scan.startup.specific-offsets'
, 'scan.startup.timestamp-millis'
and 'properties.group.id'
(only used for 'group-offsets' startup mode).
KTable Sink
In order to guarantee the message ordering, the KTable sink will always work in HASH partitioner mode on the primary key fields. Therefore, we don’t need the ‘sink.partitioner’ option in KTable connector.
KTable Connector Options
The options in KTable Connector are much like Kafka Connector.
be wrong.
Currently, the upsert-kafka connector doesn’t provide start reading position options. The upsert-kafka connector must be read from the earliest offset. This is a protection for data integrity, otherwise it’s hard to explain what’s the behavior when users specify the start offset from a middle position and how to process delete events whose keys have never been seen. Therefore, the upsert-kafka connector doesn’t provide options like '
scan.startup.mode'
, 'scan.startup.specific-offsets'
, 'scan.startup.timestamp-millis'
and 'properties.group.id'
(only used for 'group-offsets
' startup mode).
Upsert-kafka Sink
In order to guarantee the message ordering, the upsert-kafka sink will always work in HASH partitioner mode on the primary key fields. Therefore, we don’t need the ‘sink.partitioner
’ option in the upsert-kafka connector.
Upsert-kafka Connector Options
The options in upsert-kafka Connector are much like Kafka Connector.
Option | Required | Default | Type | Description | |||||||
---|---|---|---|---|---|---|---|---|---|---|---|
connector | required | (none) | String | Specify what connector to use, here should be 'upsert-kafka'. | |||||||
properties.bootstrap.servers | required | (none) | String | Comma separated list of Kafka brokers. | |||||||
key.format | required | (none) | String | The format used to deserialize and serialize the key of Kafka record. Only insert-only format is supported. | |||||||
value.format | |||||||||||
Option | Required | Default | Type | Description | |||||||
connector | required | (none) | StringSpecify what connector to use, here should be 'ktable' | The format used to deserialize and serialize the value of Kafka records. | |||||||
topic | requiredoptional | (none) | String | Topic name(s) to read data from when the table is used as source or to write data to when the table is used as sink. | |||||||
properties.bootstrap.servers | required | (none) | String | Comma separated list of Kafka brokers. | |||||||
Note, only one of " | |||||||||||
topic-pattern | optional | key.format | required | (none) | String | The format used to deserialize and serialize the key of Kafka record. | value.format | required | (none) | String | The format used to deserialize and serialize the value of Kafka recordsregular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of "topic-pattern " and "topic " can be specified for sources. |
value.fields-include | optional | 'ALL' | String | Controls which fields should end up in the value as well, possible values - ALL (all fields of the schema, even if they are part of e.g. the key) - EXCEPT_KEY (all fields of the schema - fields of the key)
| sink.semantic | optional | at-least-once | String | Defines the delivery semantic for the Kafka sink. Valid enumerations are 'at-least-once', 'exactly-once' and 'none'.
Note: only insert-only format is supported to be used as 'key.format'
and 'value.format
'. We will use the ChangelogMode
of the format to distinguish whether the format is insert-only.
Examples
Convert a Kafka topic with debezium format into the
...
upsert-kafka Table
Code Block | ||
---|---|---|
| ||
-- register a kafka source which interpret debezium topic as a changelog stream CREATE TABLE dbz_users ( user_id BIGINT, user_name STRING, region STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'dbz_users', 'properties.bootstrap.servers' = '...', 'format' = 'debezium-json' ); -- changelog mode will not materialize results and display records with op flag. > SET execution.result-mode=changelog; > SELECT * FROM dbz_users; +----+---------+-----------+----------+ | op | user_id | user_name | region | +----+---------+-----------+----------+ | +I | 100 | Bob | Beijing | | +I | 101 | Alice | Shanghai | | +I | 102 | Greg | Berlin | | +I | 103 | Richard | Berlin | | -U | 101 | Alice | Shanghai | | +U | 101 | Alice | Hangzhou | | -D | 103 | Richard | Berlin | +----+---------+-----------+----------+ -- register aan ktableupsert-kafka sink which will be used for storing latest users information CREATE TABLE users ( user_id BIGINT, user_name STRING, user_level STRING, region STRING, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'ktableupsert-kafka', 'topic' = 'users', 'properties.bootstrap.servers' = '...', 'key.format' = 'csv', 'value.format' = 'avro' ); -- convert the debezium topic into kafka compacted topic INSERT INTO users SELECT * FROM dbz_users; -- table mode will materialize changelog and refresh the final results > SET execution.result-mode=table; > SELECT * FROM users; +---------+-----------+----------+ | user_id | user_name | region | +---------+-----------+----------+ | 100 | Bob | Beijing | | 101 | Alice | Hangzhou | | 102 | Greg | Berlin | +---------+-----------+----------+ |
Use
...
the upsert-kafka as a reference/dimension table
Code Block | ||
---|---|---|
| ||
CREATE TABLE pageviews ( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, proctime AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'pageviews', 'properties.bootstrap.servers' = '...', 'format' = 'avro' ); > SET execution.result-mode=changelog; > SELECT * FROM pageviews; +----+---------+-----------+----------------------+----------+ | op | user_id | page_id | viewtime | proctime | +----+---------+-----------+----------------------+----------+ | +I | 100 | 10001 | 2020-10-01 08:01:00 | ........ | | +I | 102 | 10002 | 2020-10-01 08:02:00 | ........ | | +I | 101 | 10002 | 2020-10-01 08:04:00 | ........ | | +I | 102 | 10004 | 2020-10-01 08:06:00 | ........ | | +I | 102 | 10003 | 2020-10-01 08:07:00 | ........ | +----+---------+-----------+----------------------+----------+ CREATE TABLE pageviews_enriched ( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING, WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'pageviews_enriched', ... ); -- insert-only stream temporal join a changelog stream which will be supported by FLIP-132 INSERT INTO pageviews_enriched SELECT * FROM pageviews AS p LEFT JOIN users FOR SYSTEM_TIME AS OF p.proctime AS u ON p.user_id = u.user_id; > SET execution.result-mode=changelog; > SELECT * pageviews_enriched; +----+---------+-----------+----------------------+-------------+ | op | user_id | page_id | viewtime | user_region | +----+---------+-----------+----------------------+-------------+ | +I | 100 | 10001 | 2020-10-01 08:01:00 | Beijing | | +I | 102 | 10002 | 2020-10-01 08:02:00 | Berlin | | +I | 101 | 10002 | 2020-10-01 08:04:00 | Hangzhou | | +I | 102 | 10004 | 2020-10-01 08:06:00 | Berlin | | +I | 102 | 10003 | 2020-10-01 08:07:00 | Berlin | +----+---------+-----------+----------------------+-------------+ |
Write aggregate results into
...
the upsert-kafka
Code Block | ||
---|---|---|
| ||
CREATE TABLE pageviews_per_region_5min ( window_start STRING, region STRING, view_count BIGINT, PRIMARY KEY (window_start, region) NOT ENFORCED ) WITH ( 'connector' = 'ktableupsert-kafka', 'topic' = 'pageviews_per_region_5min', 'properties.bootstrap.servers' = '...', 'key.format' = 'csv', 'value.format' = 'avro' ); INSERT INTO pageviews_per_region_5min SELECT TUMBLE_START(viewtime, INTERVAL '5' MINUTE), region, COUNT(*) FROM pageviews_enriched GROUP BY region, TUMBLE(viewtime, INTERVAL '5' MINUTE); > SET execution.result-mode=table; > SELECT * pageviews_per_region_5min; +----------------------+-----------+-------------+ | window_start | region | view_count | +----------------------+-----------+-------------+ | 2020-10-01 08:00:00 | Beijing | 1 | | 2020-10-01 08:00:00 | Berlin | 1 | | 2020-10-01 08:00:00 | Hangzhou | 1 | | 2020-10-01 08:05:00 | Berlin | 2 | +----------------------+-----------+-------------+ |
Aggregate on
...
the upsert-kafka connector
Code Block | ||
---|---|---|
| ||
CREATE TABLE pageviews_per_region ( region STRING, view_count BIGINT, PRIMARY KEY (region) NOT ENFORCED ) WITH ( 'connector' = 'ktableupsert-kafka', 'topic' = 'pageviews_per_region', 'properties.bootstrap.servers' = '...', 'key.format' = 'csv', 'value.format' = 'avro' ); INSERT INTO pageviews_per_region SELECT region, SUM(view_count) FROM pageviews_per_region_5min GROUP BY region; > SET execution.result-mode=table; > SELECT * pageviews_per_region; +-----------+-------------+ | region | view_count | +-----------+-------------+ | Beijing | 1 | | Berlin | 3 | | Hangzhou | 1 | +-----------+-------------+ |
Implementation Details
Due to the KTable connector upsert-kafka connector only produces upsert stream which doesn’t contain UPDATE_BEFORE messages. However, several operations require the UPDATE_BEFORE messages for correctly processing, e.g. aggregations. Therefore, we need to have a physical node to materialize the upsert stream and generate changelog stream with full change messages. In the physical operator, we will use state to know whether the key is the first time to be seen. The operator will produce INSERT rows, or additionally generate UPDATE_BEFORE rows for the previous image, or produce DELETE rows with all columns filled with values.
...
Considering the similarity between Kafka connector and KTable connectorthe upsert-kafka connector, we should reuse most of the code under the hood and just introduce a different connector factory.
Compatibility, Deprecation, and Migration Plan
This change introduces a new feature that does not imply any compatibility concerns.
Rejected Alternatives
Introduce a new property in Kafka connector vs Introduce
...
an upsert-kafka connector
- It's hard to explain what's the behavior when users specify the start offset from a middle position (e.g. how to process non exist delete events). It's dangerous if users do that. So we don't provide the offset option in the new connector at the moment.
- It's a different perspective/abstraction on the same kafka topic (append vs. upsert). It would be easier to understand if we can separate them instead of mixing them in one connector. The new connector requires hash sink partitioner, primary key declared, regular format. If we mix them in one connector, it might be confusing how to use the options correctly.
- The semantic of the KTable upsert-kafka connector is just the same as KTable in Kafka Stream. So it's very handy for Kafka Stream and KSQL users. We have seen several questions [1][2] in the mailing list asking how to model a KTable and how to join a KTable in Flink SQL] in the mailing list asking how to model a KTable and how to join a KTable in Flink SQL.
Use upsert-kafka as the new connector name vs Use kafka-compacted as the name vs Use ktable as the name
Considering the KTable has more implicit meaning than expected and the compacted meaning in the kafka-compacted is more related to topic rather than table itself, it's more suitable to use the upsert-kafka as the name of the new connector that is much straightforward.
Future Work
Support bounded
...
upsert-kafka source
It’s also possible that users want to use KTable upsert-kafka connector in batch mode. But we need more discuission about this feature.
References
...