Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, KTable connector doesn’t provide start reading position options. KTable must be read from the earliest offset. This is a protection for data integrity, otherwise it’s hard to explain what’s the behavior when users specify the start offset from a middle position and how to process delete events whose keys have never been seen. Therefore, KTable connector doesn’t provide options like ‘scan 'scan.startup.mode’, ‘scanmode', 'scan.startup.specific-offsets’offsets', ‘scan'scan.startup.timestamp-millis’ millis' and 'properties.group.id' (only used for 'group-offsets' startup mode).

...

Code Block
languagesql
-- register a kafka source which interpret debezium topic as a changelog stream
CREATE TABLE dbz_users (
  user_id BIGINT,
  user_name STRING,
  user_level STRING,
  region STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'dbz_users',
  'properties.bootstrap.servers' = '...',
  'format' = 'debezium-json'
);
-- changelog mode will not materilize results and display records with op flag.
> SET execution.result-mode=changelog;
> SELECT * FROM dbz_users;
+----+---------+-----------+----------+
| op | user_id | user_name |   region |
+----+---------+-----------+----------+
| +I | 100     |  Bob      | Beijing  |
| +I | 101     |  Alice    | Shanghai |
| +I | 102     |  Greg     | Berlin   |
| +I | 103     |  Richard  | Berlin   |
| -U | 101     |  Alice    | Shanghai |
| +U | 101     |  Alice    | Hangzhou |
| -D | 103     |  Richard  | Berlin   |
+----+---------+-----------+----------+

-- register a ktable sink which will be used for storing latest users information
CREATE TABLE users (
  user_id BIGINT,
  user_name STRING,
  user_level STRING,
  region STRING,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'ktable',
  'topic' = 'users',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'csv',
  'value.format' = 'avro'
);

-- convert the debezium topic into kafka compacted topic
INSERT INTO users SELECT * FROM dbz_users;

-- table mode will materialize results and only display final result
> SET execution.result-mode=table;
> SELECT * FROM users;

+---------+-----------+----------+
| user_id | user_name |   region |
+---------+-----------+----------+
| 100     |  Bob      | Beijing  |
| 101     |  Alice    | Hangzhou |
| 102     |  Greg     | Berlin   |
+---------+-----------+----------+

...

Code Block
languagesql
CREATE TABLE pageviews (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews',
  'properties.bootstrap.servers' = '...',
  'format' = 'avro'
);

> SET execution.result-mode=changelog;
> SELECT * FROM pageviews;

+----+---------+-----------+----------------------+----------+
| op | user_id |   page_id |             viewtime | proctime |
+----+---------+-----------+----------------------+----------+
| +I | 100     |  10001    | 2020-10-01 08:01:00  | ........ |
| +I | 102     |  10002    | 2020-10-01 08:02:00  | ........ |
| +I | 101     |  10002    | 2020-10-01 08:04:00  | ........ |
| +I | 102     |  10004    | 2020-10-01 08:06:00  | ........ |
| +I | 103102     |  10003    | 2020-10-01 08:07:00  | ........ |
+----+---------+-----------+----------------------+----------+


CREATE TABLE pageviews_enriched (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  user_region STRING,
  WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews_enriched',
  ...
);

-- insert-only stream temporal join a changelog stream which will be -- supported by FLIP-132
INSERT INTO pageviews_enriched
SELECT * 
FROM pageviews AS p
LEFT JOIN users FOR SYSTEM_TIME AS OF p.proctime AS u
ON p.user_id = u.user_id;

> SET execution.result-mode=changelog;
> SELECT * pageviews_enriched;

+----+---------+-----------+----------------------+-------------+
| op | user_id |   page_id |             viewtime | user_region |
+----+---------+-----------+----------------------+-------------+
| +I | 100     |  10001    | 2020-10-01 08:01:00  |    Beijing  |     
| +I | 102     |  10002    | 2020-10-01 08:02:00  |    Berlin   |    
| +I | 101     |  10002    | 2020-10-01 08:04:00  |    Hangzhou |      
| +I | 102     |  10004    | 2020-10-01 08:06:00  |    Berlin   |    
| +I | 102     |  10003    | 2020-10-01 08:07:00  |    Berlin   |  
+----+---------+-----------+----------------------+-------------+

...

Code Block
languagesql
CREATE TABLE pageviews_per_region (
  region STRING,
  view_count BIGINT,
  PRIMARY KEY (region) NOT ENFORCED
) WITH (
  'connector' = 'ktable',
  'topic' = 'pageviews_per_region',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'csv',
  'value.format' = 'avro'
);

INSERT INTO pageviews_per_region
SELECT 
  region,
  SUM(view_count)
FROM pageviews_per_levelregion_region5min
GROUP BY region;
> SET execution.result-mode=table;
> SELECT * pageviews_per_region;

+-----------+-------------+
|    region |  view_count |
+-----------+-------------+
|  Beijing  |          1  |
|  Berlin   |          3  |
|  Hangzhou |          1  |
+-----------+-------------+

...