You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current state"Under Discussion"

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Motivation

Through mailing lists and community issues, many users have already expressed their needs for the upsert Kafka. After looking through the mailing lists, we think there are 3 reasons behind:

  • Interpret compacted kafka topic (or KTable notion in Kafka) as a changelog stream that interpret records with keys as upsert events [1-3];  
  • As a part of the real time pipeline, join multiple streams for enrichment and store results into a Kafka topic for further calculation later. However, the result may contain update events.
  • As a part of the real time pipeline, aggregate on data streams and store results into a Kafka topic for further calculation later. However, the result may contain update events.

Public Interface

KTable Connector Definition

KTable connector produces a changelog stream, where each data record represents an update or delete event. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key, if any (if a corresponding key doesn’t exist yet, the update will be considered an INSERT). Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null values are interpreted in a special way: a record with a null value represents a “DELETE”.

Primary-Key Constraints on the KTable

FLIP-87 proposed the definition of primary key that primary key constraints tell a column or a set of columns of a table or a view are unique and they do not contain null. Neither of the columns in a primary can be nullable. Primary key therefore uniquely identifies a row in a table. However, Flink does not own the data therefore we only support NOT ENFORCED mode. It means it up to the user to ensure the key integrity, Flink will simply trust correctness of the primary key.

KTable source is a kind of changelog source, FLIP-132 also gives a clear definition for primary keys on changelog source which is also aligned with FLIP-87. The primary key semantics on changelog source means the materialized changelogs (INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) are unique on the primary key constraints. Flink assumes all messages are in order on the primary key and will use the primary key for further materialization (e.g. generate UPDATE_BEFORE for KTable source) and optimization.

When the KTable connector is used as a sink, it works similar to the existing HBase sink. KTable sink doesn’t require planner to send UPDATE_BEFORE messages (planner may still send UPDATE_BEFORE messages in some cases), and will write INSERT/UPDATE_AFTER messages as normal Kafka records with key parts, and will write DELETE messages as Kafka records with null values (indicate tombstone for the key).  Flink will guarantee the message ordering on the primary key by partition data on the values of the primary key columns.

Creating a KTable table in Flink requires declaring the primary key on the table. The primary key definition also controls which fields should end up in Kafka’s key. Therefore, we don’t need the ‘key.fields’ option in KTable connector. By default, primary key fields will also be stored in Kafka’s value as well. But users can still use option ‘value.fields-include’ to control this behavior.

KTable Source

Generally speaking, the underlying topic of KTable source should be compacted. But users can still be able to create a KTable source on a non-compacted topic. If users enable “delete” cleanup policy, that means Kafka will drop data, then Flink can’t guarantee correctness as expected. Besides, the underlying topic must have all the data with the same key in the same partition, otherwise, the result will be wrong.

Currently, KTable connector doesn’t provide start reading position options. KTable must be read from the earliest offset. This is a protection for data integrity, otherwise it’s hard to explain what’s the behavior when users specify the start offset from a middle position and how to process delete events whose keys have never been seen. Therefore, KTable connector doesn’t provide options like ‘scan.startup.mode’, ‘scan.startup.specific-offsets’, ‘scan.startup.timestamp-millis’ and ‘properties.group.id’ (only used for 'group-offsets' startup mode).

KTable Sink

In order to guarantee the message ordering, the KTable sink will always work in HASH partitioner mode on the primary key fields. Therefore, we don’t need the ‘sink.partitioner’ option in KTable connector. 

KTable Connector Options

The options in KTable Connector are much like Kafka Connector.

Option

Required

Default

Type

Description

connector

required

(none)

String

Specify what connector to use, here should be 'ktable'.

topic

required

(none)

String

Topic name to read data from when the table is used as source or to write data to when the table is used as sink.

properties.bootstrap.servers

required

(none)

String

Comma separated list of Kafka brokers.

key.format

required

(none)

String

The format used to deserialize and serialize the key of Kafka record.

value.format

required

(none)

String

The format used to deserialize and serialize the value of Kafka records.

value.fields-include

optional

'ALL'

String

Controls which fields should end up in the value as well, possible values 

-   ALL (all fields of the schema, even if they are part of e.g. the key)

-   EXCEPT_KEY (all fields of the schema - fields of the key)

 

fields.verify-integrity

optional

false

Boolean

Controls if we should perform an equality check if a field is contained in different parts of consumer record (either in key or value). If true and a field originates from more than a single location (e.g. both in key and value), a check will be performed to validate both values are equal.

 

sink.semantic

optional

at-least-once

String

Defines the delivery semantic for the Kafka sink. Valid enumerations are 'at-least-once', 'exactly-once' and 'none'. 

Examples

Convert a Kafka topic with debezium format into the KTable Table

-- register a kafka source which interpret debezium topic as a changelog stream
CREATE TABLE dbz_users (
  user_id BIGINT,
  user_name STRING,
  user_level STRING,
  region STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'dbz_users',
  'properties.bootstrap.servers' = '...',
  'format' = 'debezium-json'
);
-- changelog mode will not materilize results and display records with op flag.
> SET execution.result-mode=changelog;
> SELECT * FROM dbz_users;
+----+---------+-----------+----------+
| op | user_id | user_name |   region |
+----+---------+-----------+----------+
| +I | 100     |  Bob      | Beijing  |
| +I | 101     |  Alice    | Shanghai |
| +I | 102     |  Greg     | Berlin   |
| +I | 103     |  Richard  | Berlin   |
| -U | 101     |  Alice    | Shanghai |
| +U | 101     |  Alice    | Hangzhou |
| -D | 103     |  Richard  | Berlin   |
+----+---------+-----------+----------+

-- register a ktable sink which will be used for storing latest users information
CREATE TABLE users (
  user_id BIGINT,
  user_name STRING,
  user_level STRING,
  region STRING,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'ktable',
  'topic' = 'users',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'csv',
  'value.format' = 'avro'
);

-- convert the debezium topic into kafka compacted topic
INSERT INTO users SELECT * FROM dbz_users;

-- table mode will materialize results and only display final result
> SET execution.result-mode=table;
> SELECT * FROM users;

+---------+-----------+----------+
| user_id | user_name |   region |
+---------+-----------+----------+
| 100     |  Bob      | Beijing  |
| 101     |  Alice    | Hangzhou |
| 102     |  Greg     | Berlin   |
+---------+-----------+----------+

Use KTable as a reference/dimension table

CREATE TABLE pageviews (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews',
  'properties.bootstrap.servers' = '...',
  'format' = 'avro'
);

> SET execution.result-mode=changelog;
> SELECT * FROM pageviews;

+----+---------+-----------+----------------------+----------+
| op | user_id |   page_id |             viewtime | proctime |
+----+---------+-----------+----------------------+----------+
| +I | 100     |  10001    | 2020-10-01 08:01:00  | ........ |
| +I | 102     |  10002    | 2020-10-01 08:02:00  | ........ |
| +I | 101     |  10002    | 2020-10-01 08:04:00  | ........ |
| +I | 102     |  10004    | 2020-10-01 08:06:00  | ........ |
| +I | 103     |  10003    | 2020-10-01 08:07:00  | ........ |
+----+---------+-----------+----------------------+----------+


CREATE TABLE pageviews_enriched (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  user_region STRING,
  WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews_enriched',
  ...
);

-- insert-only stream temporal join a changelog stream which will be -- supported by FLIP-132
INSERT INTO pageviews_enriched
SELECT * 
FROM pageviews AS p
LEFT JOIN users FOR SYSTEM_TIME AS OF p.proctime AS u
ON p.user_id = u.user_id;

> SET execution.result-mode=changelog;
> SELECT * pageviews_enriched;

+----+---------+-----------+----------------------+-------------+
| op | user_id |   page_id |             viewtime | user_region |
+----+---------+-----------+----------------------+-------------+
| +I | 100     |  10001    | 2020-10-01 08:01:00  |    Beijing  |     
| +I | 102     |  10002    | 2020-10-01 08:02:00  |    Berlin   |    
| +I | 101     |  10002    | 2020-10-01 08:04:00  |    Hangzhou |      
| +I | 102     |  10004    | 2020-10-01 08:06:00  |    Berlin   |    
| +I | 102     |  10003    | 2020-10-01 08:07:00  |    Berlin   |  
+----+---------+-----------+----------------------+-------------+

Write aggregate results into a KTable

CREATE TABLE pageviews_per_region_5min (
  window_start STRING,
  region STRING,
  view_count BIGINT,
  PRIMARY KEY (window_start, region) NOT ENFORCED
) WITH (
  'connector' = 'ktable',
  'topic' = 'pageviews_per_region_5min',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'csv',
  'value.format' = 'avro'
);

INSERT INTO pageviews_per_region_5min
SELECT 
  TUMBLE_START(viewtime, INTERVAL '5' MINUTE), 
  region,
  COUNT(*)
FROM pageviews_enriched
GROUP BY region, TUMBLE(viewtime, INTERVAL '5' MINUTE);

> SET execution.result-mode=table;
> SELECT * pageviews_per_region_5min;

+----------------------+-----------+-------------+
|         window_start |    region |  view_count |
+----------------------+-----------+-------------+
| 2020-10-01 08:00:00  |  Beijing  |          1  |
| 2020-10-01 08:00:00  |  Berlin   |          1  |
| 2020-10-01 08:00:00  |  Hangzhou |          1  |
| 2020-10-01 08:05:00  |  Berlin   |          2  |
+----------------------+-----------+-------------+

Aggregate on a KTable

CREATE TABLE pageviews_per_region (
  region STRING,
  view_count BIGINT,
  PRIMARY KEY (region) NOT ENFORCED
) WITH (
  'connector' = 'ktable',
  'topic' = 'pageviews_per_region',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'csv',
  'value.format' = 'avro'
);

INSERT INTO pageviews_per_region
SELECT 
  region,
  SUM(view_count)
FROM pageviews_per_level_region
GROUP BY region;
> SET execution.result-mode=table;
> SELECT * pageviews_per_region;

+-----------+-------------+
|    region |  view_count |
+-----------+-------------+
|  Beijing  |          1  |
|  Berlin   |          3  |
|  Hangzhou |          1  |
+-----------+-------------+

Implementation Details

Due to the KTable connector only produces upsert stream which doesn’t contain UPDATE_BEFORE messages. However, several operations require the UPDATE_BEFORE messages for correctly processing, e.g. aggregations. Therefore, we need to have a physical node to materialize the upsert stream and generate changelog stream with full change messages. In the physical operator, we will use state to know whether the key is the first time to be seen. The operator will produce INSERT rows, or additionally generate UPDATE_BEFORE rows for the previous image, or produce DELETE rows with all columns filled with values.

How can the planner know to add such a materialization operator? This depends on the ChangeMode of the source which is introduced in FLIP-95. Currently, we only support insert-only or all-kinds (e.g. CDC format) ChangelogMode. In this FLIP, we will support [UPDATE_AFTER, DELETE] ChangelogMode which indicates the source will emit only UPDATE_AFTER and DELETE messages during runtime. The planner will add the above materialization operator when the ChangelogMode of the source is [UPDATE_AFTER, DELETE]. 

Considering the similarity between Kafka connector and KTable connector, we should reuse most of the code under the hood and just introduce a different connector factory.

Compatibility, Deprecation, and Migration Plan

This change introduces a new feature that does not imply any compatibility concerns.

Rejected Alternatives

Introduce a new property in Kafka connector vs Introduce a KTable connector

  1. It's hard to explain what's the behavior when users specify the start offset from a middle position (e.g. how to process non exist delete events). It's dangerous if users do that. So we don't provide the offset option in the new connector at the moment.
  2. It's a different perspective/abstraction on the same kafka topic (append vs. upsert). It would be easier to understand if we can separate them instead of mixing them in one connector. The new connector requires hash sink partitioner, primary key declared, regular format. If we mix them in one connector, it might be confusing how to use the options correctly.
  3. The semantic of the KTable connector is just the same as KTable in Kafka Stream. So it's very handy for Kafka Stream and KSQL users. We have seen several questions [1][2] in the mailing list asking how to model a KTable and how to join a KTable in Flink SQL.

References

[1]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-materialized-upsert-tables-td18482.html#a18503

[2]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/From-Kafka-Stream-to-Flink-td28879.html

[3]https://issues.apache.org/jira/browse/FLINK-19149


  • No labels