Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current stateUnder Discussion

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Table of Contents

Motivation

The main purpose of this FLIP is to improve the near-real-time (NRT) experience of Flink. We propose to support windowing table-valued function (TVF) syntax as an entry point of NRT use cases. We will explain why we have this decision and the benefits of introducing windowing TVF.

...

I have heard some users complained that it's expensive to use Flink for NRT use cases. As far as we know, NRT is a very common scenarios. We might have overlooked it before. That's why we want to improve it and make Flink to be a great engine for NRT scenarios.

Goal

What we need to do for that purpose? We investigated many Flink streaming jobs and we found the following pain points.

...

In the industry, users typically use batch engine and scheduler to build NRT pipelines. We also investigated lots of such jobs and found most of jobs are 15-minutes and cumulative aggregations (aggregations from 0 ~ current minute). For example, the number of cumulative UV at 10:00 represents the total number of UV from 00:00 to 10:00. Therefore, the daily report is a monotonically increasing line like the following picture. Snowflake also provides an example of cumulative window aggregates.

image.pngImage Added

According to our research, most of such use cases are supported by a batch job and scheduled every 15 minutes. The upstream is a 15-minutes partitioned table with the detailed data in that 15 minutes. The batch job will load all the partitions so far today for aggregating using such filter where ds = '${current_date}' and (hh <'${hour}' or (hh ='${hour}' and mm <= '${minute}')). However, it has following pain points:

...

  • Reduce latency. The stream processing and get rid of the scheduler which can save the scheduling time. Streaming system can continuously process, it can produce output in a more timely manner.
  • Steady output of data. Streaming job is a long running task, the output of windows will never get lost.
  • Solve data drift and out-of-order problem with the watermark mechanism.
  • Reduce the learning curve and support richer operations for windowing. It's easier to propagate the regular window_start and window_end columns than the special time attribute column.

Public Interfaces

The idea of windowing table-valued function is from "One SQL To Rule Them All" which is published & presented at SIGMOD 2019. It leverages the new introduced Polymorphic Table Functions (abbreviated PTF) which are the part of the SQL 2016 standard. The "One SQL" paper illustrates two windowing TVFs: Tumbling and hopping windows. For Flink SQL, we would also like to add a new kind window for cumulative aggregations which is quite common in NRT.

This requires the parser support of Calcite. Fortunately, Calcite v1.25.0 has already supports tumbling and hopping windowing TVF. We need to extend it to support one more window: cumulative window.

Windowing table-valued functions

Tumbling Windows

The table-valued function TUMBLE  assigns a window for each row of a relation based on a time attribute column. The return value of TUMBLE is a relation that includes all columns of data as well as additional two columns named window_start and window_end which indicate the assigned window. All assigned windows have the same length, and that’s why tumbling sometimes is named as “fixed windowing”.

...

Code Block
languagesql
> SELECT * FROM Bid;

--------------------------
| bidtime | price | item |
--------------------------
| 8:07    | $2    | A    |
| 8:11    | $3    | B    |
| 8:05    | $4    | C    |
| 8:09    | $5    | D    |
| 8:13    | $1    | E    |
| 8:17    | $6    | F    |
--------------------------

> SELECT * FROM TABLE(
   TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));

------------------------------------------------------
| bidtime | price | item | window_start | window_end |
------------------------------------------------------
| 8:07    | $2    | A    | 8:00         | 8:10       |
| 8:11    | $3    | B    | 8:10         | 8:20       |
| 8:05    | $4    | C    | 8:00         | 8:10       |
| 8:09    | $5    | D    | 8:00         | 8:10       |
| 8:13    | $1    | E    | 8:10         | 8:20       |
| 8:17    | $6    | F    | 8:10         | 8:20       |
------------------------------------------------------

> SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

-------------------------------------
| window_start | window_end | price |
-------------------------------------
| 8:00         | 8:10       | $11   |
| 8:10         | 8:20       | $10   |
-------------------------------------

Hopping Windows

The table-valued function HOP assigns windows that cover rows within the interval of size and shifting every slide based on a timestamp column. The return value of HOP is a relation that includes all columns of data as well as additional two columns named window_start and window_end which indicate the assigned window. Windows assigned could have overlapping so hopping sometime is named as “sliding windowing”.

...

Code Block
languagesql
> SELECT * FROM TABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));

------------------------------------------------------
| bidtime | price | item | window_start | window_end |
------------------------------------------------------
| 8:07    | $2    | A    | 8:00         | 8:10       |
| 8:07    | $2    | A    | 8:05         | 8:15       |
| 8:11    | $3    | B    | 8:05         | 8:15       |
| 8:11    | $3    | B    | 8:10         | 8:20       |
| 8:05    | $4    | C    | 8:00         | 8:10       |
| 8:05    | $4    | C    | 8:05         | 8:15       |
| 8:09    | $5    | D    | 8:00         | 8:10       |
| 8:09    | $5    | D    | 8:05         | 8:15       |
| 8:13    | $1    | E    | 8:05         | 8:15       |
| 8:13    | $1    | E    | 8:10         | 8:20       |
| 8:17    | $6    | F    | 8:10         | 8:20       |
| 8:17    | $6    | F    | 8:15         | 8:25       |
------------------------------------------------------

> SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

--------------------------------------
|  window_start | window_end | price |
--------------------------------------
| 8:00          | 8:10       | $11   |
| 8:05          | 8:15       | $15   |
| 8:10          | 8:20       | $10   |
| 8:15          | 8:25       | $6    |
--------------------------------------

Cumulating Windows

The table-valued function CUMULATE assigns windows that cover rows within an initial interval of step size, and expanding to one more step size (keep window start fixed) every step until to the max window size. The return value of CUMULATE  is a relation that includes all columns of data as well as additional two columns named window_start and window_end which indicate the assigned window. Windows assigned could have overlapping.

...

Code Block
languagesql
> SELECT * FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));

------------------------------------------------------
| bidtime | price | item | window_start | window_end |
------------------------------------------------------
| 8:07    | $2    | A    | 8:00         | 8:08       |
| 8:07    | $2    | A    | 8:00         | 8:10       |
| 8:11    | $3    | B    | 8:10         | 8:12       |
| 8:11    | $3    | B    | 8:10         | 8:14       |
| 8:11    | $3    | B    | 8:10         | 8:16       |
| 8:11    | $3    | B    | 8:10         | 8:18       |
| 8:11    | $3    | B    | 8:10         | 8:20       |
| 8:05    | $4    | C    | 8:00         | 8:06       |
| 8:05    | $4    | C    | 8:00         | 8:08       |
| 8:05    | $4    | C    | 8:00         | 8:10       |
| 8:09    | $5    | D    | 8:00         | 8:10       |
| 8:13    | $1    | E    | 8:10         | 8:14       |
| 8:13    | $1    | E    | 8:10         | 8:16       |
| 8:13    | $1    | E    | 8:10         | 8:18       |
| 8:13    | $1    | E    | 8:10         | 8:20       |
| 8:17    | $6    | F    | 8:10         | 8:18       |
| 8:17    | $6    | F    | 8:10         | 8:20       |
------------------------------------------------------

> SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

--------------------------------------
|  window_start | window_end | price |
--------------------------------------
| 8:00          | 8:06       | $4    |
| 8:00          | 8:08       | $6    |
| 8:00          | 8:10       | $11   |
| 8:10          | 8:12       | $3    |
| 8:10          | 8:14       | $4    |
| 8:10          | 8:16       | $4    |
| 8:10          | 8:18       | $10   |
| 8:10          | 8:20       | $10   |
--------------------------------------

Operations on windows

We also would like to support various operations based on the windowing TVFs. This can make the functionality more complete.

Window Aggregate

The window aggregate requires the group by clause contains start and end columns of the windowing TVF.

...

languagesql
SELECT ...
FROM T -- relation applied windowing TVF
GROUP BY window_start, window_end, ...

In the future, we can also simplify the group by clause to only include the start or end column if the windowing TVF is TUMBLE or HOP .

Window Join

The window join requires the join on condition contains window starts equality of input tables and window ends equality of input tables. The semantic of window join is the same to the DataStream window join.

...

In the future, we can also simplify the join on clause to only include the window start equality if the windowing TVF is TUMBLE or HOP . Currently, the windowing TVFs must be the same of left and right inputs. This can be extended in the future, for example, tumbling windows join sliding windows with the same window size.

Window TopN

The window TopN requires the PARTITION BY clause contains start and end columns of the windowing TVF.

...

In the future, we can also simplify the PARTITION BY clause to only include the start or end column if the windowing TVF is TUMBLE or HOP .

Example

For example, if we have an orders stream and we want to get some cumulative aggregates from 00:00 to current 10 minutes, including item sales, distinct buyers of each item, distinct buyers of item's seller. Then we want to get the Top100 items which have the highest sale from 00:00 to current 10 minutes.


This can be illustrate in the following complex job, which contains cumulating window, and window aggregate, window join and window TopN.

Code Block
languagesql
CREATE TEMPORARY VIEW trd_itm_ord_mm
SELECT item_id, seller_id, window_start, window_end, sales, item_buyer_cnt, slr_buyer_cnt
FROM (
    SELECT item_id, seller_id, window_start, window_end, SUM(price) as sales, COUNT(DISTINCT user_id) as item_buyer_cnt
    FROM TABLE(
        CUMULATE(TABLE orders, DESCRIPTOR(pay_time), INTERVAL '10' MINUTES, INTERVAL '1' DAY))
    GROUP BY item_id, seller_id, window_start, window_end;
) a 
LEFT JOIN ( -- using window join to enrich the buyer count of the shop from 00:00 to current minute
    SELECT seller_id, window_start, window_end, COUNT(DISTINCT user_id) as slr_buyer_cnt
    FROM TABLE(
        CUMULATE(TABLE orders, DESCRIPTOR(pay_time), INTERVAL '10' MINUTES, INTERVAL '1' DAY))
    GROUP BY seller_id, window_start, window_end;
) b
ON a.seller_id = b.seller_id AND a.window_start = b.window_start AND a.window_end = b.window_end;


INSERT INTO item_top100_cumulate_10min
SELECT *
FROM (
    SELECT *,
        ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_price DESC) as rank
    FROM trd_itm_ord_mm
) WHERE rank <= 100;

Implementation

I will not go deeply into the implementation and optimization details, but will sketch the implementation approach.

  1. We need to bump up Calcite to v1.25+ first.
  2. We will not introduce any new logical nodes as there no new relational algebras.
  3. We will introduce new physical nodes for window join, window aggregate, window rank which works on existing window_start, window_end columns.
  4. We will introdcue a MetadataHandler to infer window start and window end columns. This metadata will be used to convert from logical join/aggregate/rank to physical window join/aggregate/rank.
  5. If the input node of window join/aggregate/rank is the windowing TVF, we can merge the TVF into the wiindow node. So that the window assigning is co-locate in the same operator, this can help us to have better performance by reusing window state, reducing records shuffling. For example, for cumulating window, the state of window [00:00, 01:00) can be used as the base state for window [00:00, 02:00).
  6. We will reuse the existing implementation of WindowOperator (i.e. window aggregate operator).
  7. We will support the mini-batch, local-global, distinct-split optimizations on the window operators to have better performance. Currently, they are only supported in non-window aggregates.

Compatibility, Deprecation, and Migration Plan

The existing Grouped window functions, i.e. GROUP BY TUMBLE... are still supported. We don't have a plan to deprecate or remove the existing window APIs so far. The support for Table API is a future work and should be in a separate FLIP.