...
Currently, KTable connector doesn’t provide start reading position options. KTable must be read from the earliest offset. This is a protection for data integrity, otherwise it’s hard to explain what’s the behavior when users specify the start offset from a middle position and how to process delete events whose keys have never been seen. Therefore, KTable connector doesn’t provide options like ‘scan '
scan.startup.
mode’, ‘scanmode'
, 'scan.startup.specific-
offsets’offsets'
, ‘scan'scan.startup.timestamp-
millis’ millis'
and ‘'properties.group.id
’ '
(only used for 'group-offsets' startup mode).
...
Code Block | ||
---|---|---|
| ||
-- register a kafka source which interpret debezium topic as a changelog stream CREATE TABLE dbz_users ( user_id BIGINT, user_name STRING, user_level STRING, region STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'dbz_users', 'properties.bootstrap.servers' = '...', 'format' = 'debezium-json' ); -- changelog mode will not materilize results and display records with op flag. > SET execution.result-mode=changelog; > SELECT * FROM dbz_users; +----+---------+-----------+----------+ | op | user_id | user_name | region | +----+---------+-----------+----------+ | +I | 100 | Bob | Beijing | | +I | 101 | Alice | Shanghai | | +I | 102 | Greg | Berlin | | +I | 103 | Richard | Berlin | | -U | 101 | Alice | Shanghai | | +U | 101 | Alice | Hangzhou | | -D | 103 | Richard | Berlin | +----+---------+-----------+----------+ -- register a ktable sink which will be used for storing latest users information CREATE TABLE users ( user_id BIGINT, user_name STRING, user_level STRING, region STRING, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'ktable', 'topic' = 'users', 'properties.bootstrap.servers' = '...', 'key.format' = 'csv', 'value.format' = 'avro' ); -- convert the debezium topic into kafka compacted topic INSERT INTO users SELECT * FROM dbz_users; -- table mode will materialize results and only display final result > SET execution.result-mode=table; > SELECT * FROM users; +---------+-----------+----------+ | user_id | user_name | region | +---------+-----------+----------+ | 100 | Bob | Beijing | | 101 | Alice | Hangzhou | | 102 | Greg | Berlin | +---------+-----------+----------+ |
...
Code Block | ||
---|---|---|
| ||
CREATE TABLE pageviews ( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, proctime AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'pageviews', 'properties.bootstrap.servers' = '...', 'format' = 'avro' ); > SET execution.result-mode=changelog; > SELECT * FROM pageviews; +----+---------+-----------+----------------------+----------+ | op | user_id | page_id | viewtime | proctime | +----+---------+-----------+----------------------+----------+ | +I | 100 | 10001 | 2020-10-01 08:01:00 | ........ | | +I | 102 | 10002 | 2020-10-01 08:02:00 | ........ | | +I | 101 | 10002 | 2020-10-01 08:04:00 | ........ | | +I | 102 | 10004 | 2020-10-01 08:06:00 | ........ | | +I | 103102 | 10003 | 2020-10-01 08:07:00 | ........ | +----+---------+-----------+----------------------+----------+ CREATE TABLE pageviews_enriched ( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING, WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'pageviews_enriched', ... ); -- insert-only stream temporal join a changelog stream which will be -- supported by FLIP-132 INSERT INTO pageviews_enriched SELECT * FROM pageviews AS p LEFT JOIN users FOR SYSTEM_TIME AS OF p.proctime AS u ON p.user_id = u.user_id; > SET execution.result-mode=changelog; > SELECT * pageviews_enriched; +----+---------+-----------+----------------------+-------------+ | op | user_id | page_id | viewtime | user_region | +----+---------+-----------+----------------------+-------------+ | +I | 100 | 10001 | 2020-10-01 08:01:00 | Beijing | | +I | 102 | 10002 | 2020-10-01 08:02:00 | Berlin | | +I | 101 | 10002 | 2020-10-01 08:04:00 | Hangzhou | | +I | 102 | 10004 | 2020-10-01 08:06:00 | Berlin | | +I | 102 | 10003 | 2020-10-01 08:07:00 | Berlin | +----+---------+-----------+----------------------+-------------+ |
...
Code Block | ||
---|---|---|
| ||
CREATE TABLE pageviews_per_region ( region STRING, view_count BIGINT, PRIMARY KEY (region) NOT ENFORCED ) WITH ( 'connector' = 'ktable', 'topic' = 'pageviews_per_region', 'properties.bootstrap.servers' = '...', 'key.format' = 'csv', 'value.format' = 'avro' ); INSERT INTO pageviews_per_region SELECT region, SUM(view_count) FROM pageviews_per_levelregion_region5min GROUP BY region; > SET execution.result-mode=table; > SELECT * pageviews_per_region; +-----------+-------------+ | region | view_count | +-----------+-------------+ | Beijing | 1 | | Berlin | 3 | | Hangzhou | 1 | +-----------+-------------+ |
...