Status

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Temporal Table represents a concept of a (parameterized) view on a changing table that returns the content of a table at a specific point in time. A temporal table contains a set of versioned table snapshots, it can be a changing history table which tracks the changes(e.g. database changelog) or a changing dimensioned table which materializes the changes(e.g. database table). 

For changing dimensioned table, to correlate the table, Flink uses DDL to define a temporal table and visits the temporal table data by looking up the external system’s table. For changing history table, to correlate the table, Flink uses temporal table function to define a parameterized view of changing history table and then accesses the data of view, but Temporal Table Function can only call via Table API or YAML which are very inconvenient to FLINK SQL users. If we can support temporal table DDL, user no longer needs temporal table function, and they can visit temporal table easily in SQL. 

Flink SQL obtains the ability to interpret changelog after FLIP-95, changelog is natural temporal table which contains all versioned views of the the original database table. Support temporal table on changelog would help user visit specific version of the original database table, this will enrich Flink temporal join scenario much.

Public Interfaces


  • Temporal Table Concept:

Temporal(dynamic) Table: Temporal table is a table that evolves over time as known as Flink dynamic table, rows in temporal table are associated with one or more temporal periods, all Flink tables are temporal(dynamic) table.

Version: A dynamic table can split into a set of versioned table snapshots, the version in table snapshots represents the valid life circle of rows, the start time and the end time of the valid period can be assigned by users. Temporal table can split to versioned table and regular table according to the table can tracks its history version or not.

Versioned table: If the row in dynamic table can track its history changes and visit its history versions, we called this kind of dynamic table as versioned table.

Regular table: For regular table, the row in dynamic table can only track its latest version. The table in lookup join can only track its latest version it's also a regular table.


  • Temporal Table Join Concept:

Temporal table join: Temporal table join means an arbitrary table (left input/probe side) joins with a version of dynamic table (right input/build side).

Event-time temporal table join: Event-time temporal join is the left input table using it's event time to find the corresponding version of dynamic table in a temporal table join. 

Processing-time temporal table join: Processing-time temporal join is the left input table using it's processing time to find the latest version of dynamic table in a temporal table join.


  • PRIMARY KEY semantics 

Primary key constraint in Flink means a column or a set of columns of a table or a view are unique and they do not contain null, Flink does not own the data therefore the only mode we want to support is the NOT ENFORCED mode, it is up to the user to ensure that the query enforces key integrity. Flink will use Primary key constraint to leverage for optimizations. 

The primary key semantics on changelog source means the materialized changelogs (INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) is unique on the primary key constraints. Flink assumes all messages are in order on the primary key and will use the primary key to update the materialized state in temporal join operator as an optimization.

The primary key semantics on append-only source means all input records is unique on the primary key constraints. The query result might be incorrect if there are duplicate records on primary key.

  • Event Time semantics 

Event time is the time that each individual event occurred on its producing device. This time is typically embedded within the records before they enter Flink, and that event timestamp can be extracted from each record. In event time, the progress of time depends on the data, not on any wall clocks.

Event time in changelog also keep this semantics, the event timestamp of changelog(INSERT,UPDATE,DELETE) can be extracted from each record, it means INSERTING/UPDATING/DELETING a record with the extracted time. E.g.
The changelog +INSERT(key, Flink, 09:00:00) means inserting a record with event timestamp '09:00:00' no matter when this INSERT changelog arrived,
The changelog -DELETE(Key, Flink, 09:00:00) means deleting a record with event timestamp '09:00:00' no matter when this DELETE changelog arrived.


This FLIP propose supporting both versioned table and  regular table in temporal table join.

  • Versioned Table/View:

We propose using primary key and event time to define a versioned table/view:

(1) The primary key is necessary to track different version of records with the same primary key.

(2) The event time is  necessary to split the validity life circle of every record.

-- E.g: The following versioned_rates is a valid versioned table.
CREATE TABLE versioned_rates (
    currency STRING,
    rate DECIMAL(38, 10),
currency_time TIMESTAMP(3),
WATERMARK FOR currency_time AS currency_time, -- event time (currency_time)
    PRIMARY KEY(currency) NOT ENFORCED        -- primary key (currency)
) WITH (
...
)


  • Regular Table/View

 It's easy to understand that if a dynamic table is not  a versioned table, it must be a regular table. 

Any table can be a regular table except versioned table.

-- E.g: The following latest_rates table is a regular table.
CREATE TABLE latest_rates (  -- No event time and No primary key
    currency STRING,
    rate DECIMAL(38, 10),
currency_time TIMESTAMP(3)
) WITH (
...
)


  • How to correlate a Temporal Table

The most common scenario for temporal table is used in correlate to visit specified version data in temporal table join, versioned table is only supported in temporal table join at the moment, regular table is supported anywhere. We propose use "FOR SYSTEM_TIME AS OF" to trigger visiting  specified version of temporal table.  The semantics of `FOR SYSTEM_TIME AS OF <point in time 1>` is returning the view of the temporal table at a specific point in time which is proposed in SQL:2011.

 --  Temporal table join syntax:

 <table or query name> JOIN <table or query name> FOR SYSTEM_TIME AS OF <point in time 1> ON <join condition>


-- The fact table orders
CREATE TABLE orders (
    order_id STRING,
    currency STRING,
    amount INT,
    proctime as PROCTIME(),    
    order_time TIMESTAMP(3),     
WATERMARK FOR order_time AS order_time - INTERVAL '30' SECOND
) WITH (
)

-- Event-time temporal table join
SELECT
  o.oder_id,
   o.order_time,
  o.amount * r.rate AS amount,
  r.currency
FROM orders AS o, versioned_rates FOR SYSTEM_TIME AS OF o.order_time r
on o.currency = r.currency;

-- Processing-time temporal table join
SELECT
  o.oder_id,
  o.proctime,
  o.amount * r.rate AS amount,
  r.currency
FROM orders AS o, latest_rates FOR SYSTEM_TIME AS OF o.proctime r
on o.currency = r.currency


Proposed Changes

The Proposed DDL will support only in blink planner because the legacy planner is deprecated and will be deleted soon.

  • Versioned Table/View DDL

Keep in mind that versioned table is a dynamic table contains primary key and event time attribute. 

For versioned table on source,  the version of  the versioned table is split by user specific event time and primary key.

For versioned table on view, it also valid if the query contains an inferred primary key and the event time attribute. 

-- The following all versioned_rates[1..n] tables are valid versioned table.

-- The versioned table `versioned_rates1` is built from a changelog stream which contains a primary key and event time(currency_time), the table contains multiple versions.
-- Note: The late DELETE events in changelog stream will be ignored because the event timestamp in DELETE event is usually less than the watermark.
-- The following changelog stream comes from database binglog and contains DELETE event, event timestamp in these events are usually less than the watermark,
-- because the delete event in database is almost deleting old data which has a smaller event timestamp, thus these DELETE events will be ignored.
-- It's recommended use changelog time as Event time if the changelog source contains DELETE events(i.e. versioned_rates2).
CREATE TABLE versioned_rates1 (
    currency STRING,
    rate DECIMAL(38, 10),
currency_time TIMESTAMP(3),
WATERMARK FOR currency_time AS currency_time, -- event time (currency_time)
    PRIMARY KEY(currency) NOT ENFORCED        -- primary key (currency)
) WITH (
'format' = 'debezium-json'/'canal-json' -- changelog source
)

-- The versioned table `versioned_rates2` is built from a changelog stream which contains a primary key and event time(changelog_time), the table contains multiple versions.
-- Compared to `versioned_rates1`, the version in versioned table `versioned_rates2` is split by changelog time(i.e. binlog time) which is more proper to deal changelog with late DELETE events.
-- Changelog time is the time when the changes happened in system(i.e.The system time of DML execution in database)], the event time in DELETE event will be the execution time which wouldn't less the watermark ideally.
-- It's recommended to use changelog_time as event time if the changelog source contains DELETE event.
-- Note: The schema changed after add the system time column(changelog time), this will lead retraction failed because have an extra field,
-- user should filter out the changelog time column if the table followed by the aggregation or window operation.
CREATE TABLE versioned_rates2 (
    currency STRING,
    rate DECIMAL(38, 10),
currency_time TIMESTAMP(3),
changelog_time TIMESTAMP(3) as SYSTEM_METADATA("db_operation_time"),
WATERMARK FOR changelog_time AS changelog_time, -- event time (changelog_time)
    PRIMARY KEY(currency) NOT ENFORCED        -- primary key (currency)
) WITH (
'format' = 'debezium-json'/'canal-json' -- changelog source
)

-- The versioned table `versioned_rates3` is built from an append only stream which contains a primary key and event time(currency_time),
-- the table is a versioned table but only contains single version for every record,
-- because the primary key in append-only stream means the record wouldn't be duplicated on primary key and so has only one version. 
CREATE TABLE versioned_rates3 (
    currency STRING,
    rate DECIMAL(38, 10),
currency_time TIMESTAMP(3),
WATERMARK FOR currency_time AS currency_time, -- event time
    PRIMARY KEY(currency) NOT ENFORCED        -- primary key
) WITH (
'format' = 'json' -- append only source
)

-- The versioned table  `versioned_rates4` converts the append-only source to a changelog stream, it contains an inferred primary key and keeps the event time attribute,
-- The table is also a versioned table and contains multiple versions for every record.
-- It's recommended to use `versioned_rates4` view to define a versioned table if the  append-only source contains duplicated records on the business key.
CREATE TABLE rates (
    currency STRING,
    rate DECIMAL(38, 10),
currency_time TIMESTAMP(3),
WATERMARK FOR currency_time AS currency_time -- event time
) WITH (
'format' = 'json' -- append only source
)

CREATE VIEW versioned_rates4 AS -- The view returns an ordered changelog stream with a inferred primary key(Currency) and event time(currency_time)
SELECT currency, rate, currency_time
  FROM (
      SELECT *,
      ROW_NUMBER() OVER (PARTITION BY currency ORDER BY currency_time DESC) AS rowNum
FROM rates )
WHERE rowNum = 1;



  • NOTE:The late events in changelog stream will be ignored when the event timestamp in event is less than the watermark(as known as late event in Flink).

The late events in changelog stream will be ignored when the event timestamp in event is less than the watermark, and the DELETE/UPDATE_BEFORE event usually has a late timestamp if user use the timestamp field in row as the event time, it means these DELETE/UPDATE_BEFORE events will be ignored,

UPDATE_BEFORE event is optional for versioned table and can be ignored, because there  always has a followed UPDATE_AFTER event which can update the old record. DELETE event is necessary which is used to delete the old data and can not be ignored.

The changelog stream comes from database binglog and contains DELETE event is a common user case, If users assigned the event time from row field like above case `versioned_rates1`, the  event timestamp in DELETE event is always less than the watermark and will be treated as late event in Flink, because the delete event in database is almost deleting old data which has a smaller event time, the following table shows that DELETE event is always has a late event timestamp(10:05:00) compared to database system time(11:00:00) when assign the event time from the row field.

DB system time (changelog time)

DB operation  T(pk, name, biz_ts)

Produced changelog using biz_ts as event time

Produced changelog using  changlog time as event time

09:00:00Insert  into T (key, Flink, 09:00:00)+INSERT                  (key, Flink, 09:00:00)+INSERT                  (key, Flink, 09:00:00, 09:00:00)
10:05:00

update T set name = 'Hello', biz_ts = '10:05:00' where pk = 'key'

-UPDATE_BEFORE  (key, Flink, 09:00:00)

+UPDATE_AFTER    (key, Hello, 10:05:00)

-UPDATE_BEFORE  (key, Flink, 09:00:00, 10:05:00)

+UPDATE_AFTER    (key, Hello, 10:05:00, 10:05:00)

11:00:00delete from T where pk = 'key' 

-DELETE                 (key, Hello, 10:05:00)

-DELETE                  (key, Hello, 10:05:00, 11:00:00)

In this case, it's recommended to use changelog time as event time, changelog time is the time when the changes happened in system(i.e. The system time of DML execution in database), we can extract the changelog time from binglog, this will be implemented in FLIP-107 . The versioned table will be built based on changelog time, the changelog time(11:00:00) in DELETE event is close to database system time(11:00:00) and wouldn't be less than watermark ideally just like above table showed.

The table schema has changed after added the system time column(changelog time), this will lead retraction failed because the extra field, user should filter out the system time column if the table is not used as temporal table and the table is followed by the aggregation, join, window operation which will send retraction message.


  • NOTE:The primary key should not define on an append-only stream if the stream has duplicated records on the business key.

 Primary key should not define on an append-only stream if the stream has duplicated records on the primary key, this breaks the primary key semantics and may have wrong result in Flink.

The versioned table on append-only source with duplicate records on business key is  a common user case, users want to an upsert source semantics. We proposed using deduplication query to define this kind of versioned table, the query in  the view `versioned_rates4` is a deduplication query which can convert the append-only stream to changelog stream, the changelog stream contains INSERT and UPDATE events on the primary key and also keeps the event time.  BTW,  the pattern in this query is as known deduplication query and will be optimized to a deduplication node in planner and runtime.   

In this way, one benefit is that we do not need to define the primary key in an append-only source which may contain duplicate records, the primary key semantics becomes clear, another benefit is  the result of the above view is a changelog stream, we unify the temporal table and changelog conceptually. Besides, users can filter/project/aggregate/join in the query of the view, it improves the flexibility a lot.


  • Regular Table/View:

If a dynamic table is not  a versioned temporal, it's a regular table.

A regular table means  the table or view does not both own primary key and event time at the same time.

-- The all following latest_rates[1..n] table are regular table.

CREATE TABLE
latest_rates1 ( -- No event time and No primary key
    currency STRING,
    rate DECIMAL(38, 10)
) WITH (
)

CREATE VIEW latest_rates2 AS -- No event time and No primary key
SELECT currency, rate
FROM rates;

CREATE TABLE latest_rates3 ( -- No primary key
    currency STRING,
    rate DECIMAL(38, 10),
currency_time TIMESTAMP(3),
WATERMARK FOR currency_time AS currency_time,
) WITH (
)

CREATE TABLE latest_rates4 ( -- No event time
    currency STRING,
    rate DECIMAL(38, 10),
proctime AS PROCTIME(),
    PRIMARY KEY(currency) NOT ENFORCED 
) WITH (
)

CREATE VIEW latest_rates5 AS -- No event time and only has a inferred primary key
SELECT currency, rate, proctime
  FROM (
      SELECT *,
      ROW_NUMBER() OVER (PARTITION BY currency ORDER BY proctime DESC)
AS rowNum FROM Rates)
WHERE rowNum = 1;

For regular table,  the table only keep the latest version of the table, it can be any table or any view except versioned table.

Compatibility, Deprecation, and Migration Plan

  • Compatibility

  • Deprecation

Deprecate Table#createTemporalTableFunction(Expression timeAttribute, Expression primaryKey)

  • Migration Plan


// Migration Event-time temporal table join

// The Temporal Table Function way (deprecated)

Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, $("currency"), $("rate"), $("order_time").rowtime());
tEnv.createTemporaryView("versioned_rates", ratesHistory);
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("order_time", "currency"); // <==== (1)
tEnv.registerFunction("rates", rates);     
tEnv.executeSql("SELECT
  o.oder_id,
  o.order_time,
  o.amount * r.rate AS amount,
  r.currency
FROM
  Orders AS o,
  LATERAL TABLE(rates(o.order_time))AS r
WHERE o.currency = r.currency");
------------------------------------------------------------------------

//The proposed Temporal Table DDL way, create a versioned table
CREATE VIEW versioned_rates AS
SELECT currency, rate, currency_time
FROM (
      SELECT *,
      ROW_NUMBER() OVER (PARTITION BY currency ORDER BY currency_time  DESC) AS rowNum
FROM rates)
WHERE rowNum = 1;


SELECT
  o.oder_id,
  o.order_time,
  o.amount * r.rate AS amount,
  r.currency
FROM orders AS o, versioned_rates FOR SYSTEM_TIME AS OF o.order_time r
on o.currency = r.currency;


Test Plan


Rejected Alternatives

(1)  Use TEMPORAL keyword in Temporal table DDL:

CREATE TEMPORAL TABLE versioned_rates (
     currency CHAR(3) NOT NULL PRIMARY KEY,
     rate DOUBLE,
     currency_time TIMESTAMP(3),
     WATERMARK FOR currency_time AS currency_time - INTERVAL '5' MINUTE
) WITH (
   'connector' = 'kafka',
   'topic' = 'rates',
   'format' = 'debezium-json'
)

We have the `TEMPORARY` keyword in SQL, the statement `CREATE TEMPORARY TEMPORAL TABLE rates` will confuse user much. And all tables(source/sink/temporal) in flink should dynamic table conceptually, a temporal table can works fine without 'TEMPORAL' keyword. The kafka data source and database-like data source can play as a source/sink/temporal table depends on the position/syntax that user put them in the query rather than the keyword defines in DDL.

Besides, we have used same syntax for current lookup table, we didn't add `LOOKUP` or `TEMPORAL` keyword for lookup table but triggered the temporal join by the position/syntax(`FOR SYSTEM_TIME AS OF x`) in query.  

(2) Use PERIOD FOR SYSTEM_TIME and SYSTEM_METADATA on changelog to define a temporal table.

-- For changelog data source
CREATE TABLE versioned_rates (
     currency STRING,
     rate DECIMAL(38, 10),
     PERIOD FOR SYSTEM_TIME(SYSTEM_METADATA("db_operation_time")),
     PRIMARY KEY(id) NOT ENFORCED
) WITH (
    ''connector'' = ''kafka'',
    ''format'' = ''debezium-json''
)

In this way, we need import PERIOD FOR SYSTEM_TIME/SYSTEM_METADATA keywords and also need extract the changelog time from changelog meta, and PERIOD FOR SYSTEM_TIME on view is hard to understand.

(3) Use primary key and time attribute on append-only data source to define a temporal table.

-- For append-only data source if the stream has duplicated records on the business key
CREATE TABLE versioned_rates (
     currency STRING,
     rate DECIMAL(38, 10),
     processing_time AS PROCTIME(),
     PERIOD FOR SYSTEM_TIME(processing_time)
     PRIMARY KEY(currency) NOT ENFORCED
) WITH (
   'connector' = 'kafka',
   'format' = 'json'
)

This approach breaks the primary key constraint because there are duplicate records on primary key fields. 


(4) Using changelog time and primary key implicitly to defines the versioned table on changelog stream.

-- For append-only data source if the stream has duplicated records on the business key
CREATE TABLE versioned_rates (
currency STRING,
rate DECIMAL(38, 10),
currency_time TIMESTAMP(3),
PRIMARY KEY(currency) NOT ENFORCED -- primary key
) WITH (
'connector' = 'kafka',
'format' = 'debezium-json'/'canal-json'
)

This approach is very obscure to user and can not support defines another event time(e.g. currency_time) versioned table  on changelog stream.