You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state"Under Discussion"

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Motivation

Through mailing lists and community issues, many users have already expressed their needs for the upsert Kafka. After looking through the mailing lists, we think there are 3 reasons behind:

  • Interpret compacted kafka topic (or KTable notion in Kafka) as a changelog stream that interpret records with keys as upsert events [1-3];  
  • As a part of the real time pipeline, join multiple streams for enrichment and store results into a Kafka topic for further calculation later. However, the result may contain update events.
  • As a part of the real time pipeline, aggregate on data streams and store results into a Kafka topic for further calculation later. However, the result may contain update events.

Public Interface

KTable Connector Definition

KTable connector produces a changelog stream, where each data record represents an update or delete event. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key, if any (if a corresponding key doesn’t exist yet, the update will be considered an INSERT). Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null values are interpreted in a special way: a record with a null value represents a “DELETE”.

Primary-Key Constraints on the KTable

FLIP-87 proposed the definition of primary key that primary key constraints tell a column or a set of columns of a table or a view are unique and they do not contain null. Neither of the columns in a primary can be nullable. Primary key therefore uniquely identifies a row in a table. However, Flink does not own the data therefore we only support NOT ENFORCED mode. It means it up to the user to ensure the key integrity, Flink will simply trust correctness of the primary key.

KTable source is a kind of changelog source, FLIP-132 also gives a clear definition for primary keys on changelog source which is also aligned with FLIP-87. The primary key semantics on changelog source means the materialized changelogs (INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) are unique on the primary key constraints. Flink assumes all messages are in order on the primary key and will use the primary key for further materialization (e.g. generate UPDATE_BEFORE for KTable source) and optimization.

When the KTable connector is used as a sink, it works similar to the existing HBase sink. KTable sink doesn’t require planner to send UPDATE_BEFORE messages (planner may still send UPDATE_BEFORE messages in some cases), and will write INSERT/UPDATE_AFTER messages as normal Kafka records with key parts, and will write DELETE messages as Kafka records with null values (indicate tombstone for the key).  Flink will guarantee the message ordering on the primary key by partition data on the values of the primary key columns.

Creating a KTable table in Flink requires declaring the primary key on the table. The primary key definition also controls which fields should end up in Kafka’s key. Therefore, we don’t need the ‘key.fields’ option in KTable connector. By default, primary key fields will also be stored in Kafka’s value as well. But users can still use option ‘value.fields-include’ to control this behavior.

KTable Source

Generally speaking, the underlying topic of KTable source should be compacted. But users can still be able to create a KTable source on a non-compacted topic. If users enable “delete” cleanup policy, that means Kafka will drop data, then Flink can’t guarantee correctness as expected. Besides, the underlying topic must have all the data with the same key in the same partition, otherwise, the result will be wrong.

Currently, KTable connector doesn’t provide start reading position options. KTable must be read from the earliest offset. This is a protection for data integrity, otherwise it’s hard to explain what’s the behavior when users specify the start offset from a middle position and how to process delete events whose keys have never been seen. Therefore, KTable connector doesn’t provide options like ‘scan.startup.mode’, ‘scan.startup.specific-offsets’, ‘scan.startup.timestamp-millis’ and ‘properties.group.id’ (only used for 'group-offsets' startup mode).

KTable Sink

In order to guarantee the message ordering, the KTable sink will always work in HASH partitioner mode on the primary key fields. Therefore, we don’t need the ‘sink.partitioner’ option in KTable connector. 

KTable Connector Options

The options in KTable Connector are much like Kafka Connector.

Option

Required

Default

Type

Description

connector

required

(none)

String

Specify what connector to use, here should be 'ktable'.

topic

required

(none)

String

Topic name to read data from when the table is used as source or to write data to when the table is used as sink.

properties.bootstrap.servers

required

(none)

String

Comma separated list of Kafka brokers.

key.format

required

(none)

String

The format used to deserialize and serialize the key of Kafka record.

value.format

required

(none)

String

The format used to deserialize and serialize the value of Kafka records.

value.fields-include

optional

'ALL'

String

Controls which fields should end up in the value as well, possible values 

-   ALL (all fields of the schema, even if they are part of e.g. the key)

-   EXCEPT_KEY (all fields of the schema - fields of the key)

 

fields.verify-integrity

optional

false

Boolean

Controls if we should perform an equality check if a field is contained in different parts of consumer record (either in key or value). If true and a field originates from more than a single location (e.g. both in key and value), a check will be performed to validate both values are equal.

 

sink.semantic

optional

at-least-once

String

Defines the delivery semantic for the Kafka sink. Valid enumerations are 'at-least-once', 'exactly-once' and 'none'. 

Examples

Convert a Kafka topic with debezium format into the KTable Table

-- register a kafka source which interpret debezium topic as a changelog stream
CREATE TABLE dbz_users (
  user_id BIGINT,
  user_name STRING,
  user_level STRING,
  region STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'dbz_users',
  'properties.bootstrap.servers' = '...',
  'format' = 'debezium-json'
);
-- changelog mode will not materilize results and display records with op flag.
> SET execution.result-mode=changelog;
> SELECT * FROM dbz_users;
+----+---------+-----------+----------+
| op | user_id | user_name |   region |
+----+---------+-----------+----------+
| +I | 100     |  Bob      | Beijing  |
| +I | 101     |  Alice    | Shanghai |
| +I | 102     |  Greg     | Berlin   |
| +I | 103     |  Richard  | Berlin   |
| -U | 101     |  Alice    | Shanghai |
| +U | 101     |  Alice    | Hangzhou |
| -D | 103     |  Richard  | Berlin   |
+----+---------+-----------+----------+

-- register a ktable sink which will be used for storing latest users information
CREATE TABLE users (
  user_id BIGINT,
  user_name STRING,
  user_level STRING,
  region STRING,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'ktable',
  'topic' = 'users',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'csv',
  'value.format' = 'avro'
);

-- convert the debezium topic into kafka compacted topic
INSERT INTO users SELECT * FROM dbz_users;

-- table mode will materialize results and only display final result
> SET execution.result-mode=table;
> SELECT * FROM users;

+---------+-----------+----------+
| user_id | user_name |   region |
+---------+-----------+----------+
| 100     |  Bob      | Beijing  |
| 101     |  Alice    | Hangzhou |
| 102     |  Greg     | Berlin   |
+---------+-----------+----------+


  • No labels