Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Streaming analytics is one the most important use cases among all SQL usage. Users are relying on the streaming nature of Flink to deliver subsecond end to end latency. The typical architecture for such use case is like following: 

Image RemovedImage Added


The message queue (mostly going with Kafka) will be used in both source & intermediate stages in this pipeline, to guarantee the latency stay within seconds. There will also be a real-time OLAP system receiving processed data in streaming fashion and serving user’s ad-hoc queries. 

...

Therefore, users could dual write data to Hudi or Clickhouse so that the intermediate tables can be queried. Besides from this, most message queue is known expensive to store too much data, therefore users either relying on a TTL to delete outdated data or export the data out from the message queue to some more cost friendly storage, like iceberg. 

Image RemovedImage Added

For now, let's focus on the dual write solution and try to write the SQLs:

Code Block
languagesql
titleSQL
CREATE TEMPORARY VIEW intermediate_table AS

SELECT

  A.order_id,

  A.auction_id,

  B.category_id,

  A.trans_amount,

  A.create_time

FROM orders A LEFT JOIN category_dim B

ON A.auction_id = B.auction_id;




-- Or create a Upsert-Kafka Table to accept changelog

CREATE TABLE kafka_intermediate_table_0 (

  order_id BIGINT,

  auction_id BIGINT,

  category_id BIGINT,

  trans_amount BIGINT,

  create_time TIMESTAMP,

  PRIMARY KEY (order_id) NOT ENFORCED

) WITH (

  'connector' = 'upsert-kafka',

  'topic' = '...',

  'properties.bootstrap.servers' = 'localhost:9092',

  'properties.group.id' = 'testGroup',

  'key.format' = 'avro',

  'value.format' = 'avro'

);




-- Create a Kafka Table with debezium-avro to accept changelog

CREATE TABLE kafka_intermediate_table_1 (

  order_id BIGINT,

  auction_id BIGINT,

  category_id BIGINT,

  trans_amount BIGINT,

  create_time TIMESTAMP

) WITH (

  'connector' = 'kafka',

  'topic' = '...',

  'properties.bootstrap.servers' = 'localhost:9092',

  'properties.group.id' = 'testGroup',

  'format' = 'debezium-avro'

);




-- Create a Hudi Table to accept changelog

CREATE TABLE hudi_intermediate_table (

  order_id BIGINT,

  auction_id BIGINT,

  category_id BIGINT,

  trans_amount BIGINT,

  create_time TIMESTAMP,

  dt STRING,

  PRIMARY KEY (order_id) NOT ENFORCED

) PARTITIONED BY (dt) WITH (

  'connector' = 'hudi',

  'path' = '...',

  'write.precombine.field' = 'create_time',

  'table.type' = 'MERGE_ON_READ'

);


-- Insert into

INSERT INTO kafka_intermediate_table SELECT * FROM intermediate_table;

INSERT INTO hudi_intermediate_table SELECT

  *,

  DATE_FORMAT(create_time, 'yyyy-MM-dd')

FROM intermediate_table;




-- Query: Streaming Pipeline

INSERT INTO ... SELECT ... FROM kafka_intermediate_table;




-- Query: Ad-hoc query

SELECT * FROM hudi_intermediate_table WHERE ...;

...

If we have a built-in Flink Dynamic Table, users just focus on their business logic:

Code Block
languagesql
titleSQL
-- Just business fields, primary key is not mandatory

...



CREATE TABLE intermediate_table (

...


  order_id BIGINT,

...


  auction_id BIGINT,

...


  category_id BIGINT,

...


  trans_amount BIGINT,

...


  create_time TIMESTAMP,

...

  dt STRING

...


  dt STRING
) PARTITIONED BY (dt);

...



-- Insert into

...



INSERT INTO intermediate_table

...

SELECT

...


SELECT
  A.order_id,

...


  A.auction_id,

...


  B.category_id,

...


  A.trans_amount,

...


  A.create_time,

...


  DATE_FORMAT(create_time, 'yyyy-MM-dd')

...


FROM orders A LEFT JOIN category_dim B

...


ON A.auction_id = B.auction_id;

...



-- Query: Streaming Pipeline

...



INSERT INTO ... SELECT ... FROM intermediate_table;

...



-- Query: Batch ad-hoc query

...



SELECT * FROM intermediate_table WHERE ...;

SQL Statements

CREATE

Code Block
languagesql
titleSQL
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name

...

  (

...


  (
    { <physical_column_definition> | <computed_column_definition> }[ , ...n]

...


    [ <watermark_definition> ]

...


    [ <table_constraint> ][ , ...n]

...

  )

...


  )
  [COMMENT table_comment]

...


  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]

...


  [WITH ('change-tracking' = 'false')]

...



<physical_column_definition>:

...


  column_name column_type [ <column_constraint> ] [COMMENT column_comment]

...

  



<column_constraint>:

...


  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

...



<table_constraint>:

...


  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

...



<computed_column_definition>:

...


  column_name AS computed_column_expression [COMMENT column_comment]

...



<watermark_definition>:

...


  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

When creating a table, the corresponding underlying physical storage will be created.

Very simple, it masks & abstracts the underlying technical details, no annoying options.

DROP

Code Block
languagesql
titleSQL
DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name

When dropping a table, the corresponding underlying physical storage will be deleted.

...