Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion thread

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

...


Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19604

Release1.13


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Windowing table-valued functions

We would like to propose 4 kinds of window TVF: tumbling, hopping, cumulative, session. The return value of window TVF is a relation that includes all columns of data as well as additional 3 columns named window_start, window_end, window_time to indicate the assigned window. The window_time field is a time attribute of the record after window TVF, it alwsays equal to "window_end - 1"

Tumbling Tumbling Windows

The table-valued function TUMBLE  assigns a window for each row of a relation based on a time attribute column. The return value of TUMBLE is a relation that includes all columns of data as well as additional two columns named 3 columns named window_start and , window_end which , window_time to indicate the assigned window. The original row time attribute "timecol" will be a regular timestamp column after window TVF. All assigned windows have the same length, and that’s why tumbling sometimes is named as “fixed windowing”.

TUMBLE  that takes three required parameters and one optional parameter:

Code Block
languagesql

TUMBLE(data, DESCRIPTOR(timecol), size [, offset ])

...

Code Block
languagesql
> SELECT * FROM Bid;

--------------------------
| bidtime | price | item |
--------------------------
| 8:07    | $2    | A    |
| 8:11    | $3    | B    |
| 8:05    | $4    | C    |
| 8:09    | $5    | D    |
| 8:13    | $1    | E    |
| 8:17    | $6    | F    |
--------------------------

> SELECT * FROM TABLE(
   TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));

------------------------------------------------------
| bidtime | price | item | window_start | window_end | or with the named params
-- note: the DATA param must be the first
> SELECT * FROM TABLE(
   TUMBLE(
     DATA => TABLE Bid,
     TIMECOL => DESCRIPTOR(bidtime),
     SIZE => INTERVAL '10' MINUTES));

------------------------------------------------------
| 8:07bidtime | price  | | item | window_start | window_end |
------------------------------------------------------
| 8:07    | $2    | A    | 8:00         | 8:10       |
| 8:11    | $3    | B    | 8:10         | 8:20       |
| 8:05    | $4    | C    | 8:00         | 8:10       |
| 8:09    | $5    | D    | 8:00         | 8:10       |
| 8:13    | $1    | E    | 8:10         | 8:20       |
| 8:17    | $6    | F    | 8:10         | 8:20       |
------------------------------------------------------

> SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

-------------------------------------
| window_start | window_end | price |
-------------------------------------
| 8:00         | 8:10       | $11   |
| 8:10         | 8:20       | $10   |
-------------------------------------

Hopping Windows

The table-valued function HOP assigns windows that cover rows within the interval of size and shifting every slide based on a timestamp column. The return value of HOP is a relation that includes all columns of data as well as additional two columns named This is very easy to support cascading window aggregate by using the windowing TVFs. The outer window aggregate just need to GROUP BY window_start and window_end which indicate the assigned window. Windows assigned could have overlapping so hopping sometime is named as “sliding windowing”.HOP  takes four required parameters and one optional parameter.of previous window aggregate. Therefore, we don't need the time attribute auxiliary functions (e.g. TUMBLE_ROWTIME, TUMBLE_PROCTIME).  For example:

Code Block
languagesql
HOP(data, DESCRIPTOR(timecol), slide, size [, offset ])
  • data  is a table parameter that can be any relation with an time attribute column.
  • timecol  is a column descriptor indicating which time attribute column of data should be mapped to tumbling windows.
  • slide  is a duration specifying the duration between the start of sequential hopping windows
  • size  is a duration specifying the width of the hopping windows.
  • offset  (optional) specifies that the hopping should begin from an instant other than the standard beginning of the epoch.

Here is an example invocation on the Bid table from the paper:

SELECT
  window_start,
  window_end,
  sum(cnt)
FROM (
  SELECT
     key,
     window_start,
     window_end,
     count(*) as cnt
  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
  GROUP BY key, window_start, window_end
) GROUP BY window_start, window_end

Hopping Windows

The table-valued function HOP assigns windows that cover rows within the interval of size and shifting every slide based on a timestamp column. The return value of HOP is a relation that includes all columns of data as well as additional 3 columns named window_start, window_end, window_time to indicate the assigned window. The original row time attribute "timecol" will be a regular timestamp column after window TVF. Windows assigned could have overlapping so hopping sometime is named as “sliding windowing”.

HOP  takes four required parameters and one optional parameter.

Code Block
languagesql
HOP(data, DESCRIPTOR(timecol), slide, size [, offset ])


  • data  is a table parameter that can be any relation with an time attribute column.
  • timecol  is a column descriptor indicating which time attribute column of data should be mapped to tumbling windows.
  • slide  is a duration specifying the duration between the start of sequential hopping windows
  • size  is a duration specifying the width of the hopping windows.
  • offset  (optional) specifies that the hopping should begin from an instant other than the standard beginning of the epoch.


Here is an example invocation on the Bid table from the paper:

Code Block
languagesql
> SELECT * FROM TABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
> SELECT * FROM TABLE(
    HOP(
      DATA => TABLE Bid,
      TIMECOL => DESCRIPTOR(bidtime),
      SLIDE => INTERVAL '5' MINUTES,
      SIZE => INTERVAL '10' MINUTES));

------------------------------------------------------
| bidtime | price | item | window_start | window_end |
------------------------------------------------------
| 8:07    | $2    | A    | 8:
Code Block
languagesql
> SELECT * FROM TABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));

------------------------------------------------------
| bidtime | price | item | window_start | window_end |
------------------------------------------------------
| 8:07    | $2    | A    | 8:00         | 8:10       |
| 8:07    | $2    | A    | 8:05         | 8:15       |
| 8:11    | $3    | B    | 8:05         | 8:15       |
| 8:11    | $3    | B    | 8:10         | 8:20       |
| 8:05    | $4    | C    | 8:00         | 8:10       |
| 8:0507    | $4$2    | CA    | 8:05         | 8:15       |
| 8:0911    | $5$3    | DB    | 8:0005         | 8:1015       |
| 8:0911    | $3    | B    | $5 8:10         | 8:20   D    |
| 8:05    | $4    | C    | 8:1500         | 8:10       |
| 8:1305    | $1$4    | EC    | 8:05         | 8:15       |
| 8:1309    | $1$5    | ED    | 8:1000         | 8:2010       |
| 8:1709    | $6$5    | FD    | 8:1005         | 8:2015       |
| 8:1713    | $6$1    | FE    | 8:1505         | 8:2515       |
------------------------------------------------------

> SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

--------------------------------------
|  window_start | window_end | price |
--------------------------------------
| 8:00          | 8:10       | $11   |
| 8:05| 8:13    | $1    | E    | 8:10         | 8:20       |
| 8:17    | $6    | F    | 8:10         | 8:20          |
| 8:1517    |  $6 | $15   |
| 8:10     F      | 8:20 15      | $10   |
| 8:1525          | 8:25       | $6    |
|
----------------------------------------------------

Cumulating Windows

The table-valued function CUMULATE assigns windows that cover rows within an initial interval of step size, and expanding to one more step size (keep window start fixed) every step until to the max window size. The return value of CUMULATE  is a relation that includes all columns of data as well as additional two columns named window_start and window_end which indicate the assigned window. Windows assigned could have overlapping.

CUMULATE  takes four required parameters and one optional parameter.

Code Block
languagesql
CUMULATE(data, DESCRIPTOR(timecol), step, size [, offset ])
  • data  is a table parameter that can be any relation with an time attribute column.
  • timecol  is a column descriptor indicating which time attribute column of data should be mapped to tumbling windows.
  • step  is a duration specifying the increased window size between the end of sequential cumulating windows.
  • size  is a duration specifying the max width of the cumulating windows. size must be an integral multiple of step .
  • offset  (optional) specifies that the hopping should begin from an instant other than the standard beginning of the epoch.

So a cumulating window for 1 hour step and 1 day max size will produce wndows: [00:00, 01:00), [00:00, 02:00), [00:00, 03:00), ...,  [00:00, 24:00) for every day.

Here is an example invocation on the Bid table:

Code Block
languagesql
> SELECT * FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));

--

> SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

--------------------------------------
|  window_start | window_end | price |
--------------------------------------
| 8:00          | 8:10       | $11   |
| 8:05          | 8:15       | $15   |
| 8:10          | 8:20       | $10   |
| 8:15          | 8:25       | $6    |
--------------------------------------

Cumulating Windows

The table-valued function CUMULATE assigns windows that cover rows within an initial interval of step size, and expanding to one more step size (keep window start fixed) every step until to the max window size. The return value of CUMULATE is a relation that includes all columns of data as well as additional 3 columns named window_start, window_end, window_time to indicate the assigned window. The original row time attribute "timecol" will be a regular timestamp column after window TVF. Windows assigned could have overlapping.

CUMULATE  takes four required parameters and one optional parameter.

Code Block
languagesql
CUMULATE(data, DESCRIPTOR(timecol), step, size [, offset ])
  • data  is a table parameter that can be any relation with an time attribute column.
  • timecol  is a column descriptor indicating which time attribute column of data should be mapped to tumbling windows.
  • step  is a duration specifying the increased window size between the end of sequential cumulating windows.
  • size  is a duration specifying the max width of the cumulating windows. size must be an integral multiple of step .
  • offset  (optional) specifies that the hopping should begin from an instant other than the standard beginning of the epoch.


So a cumulating window for 1 hour step and 1 day max size will produce wndows: [00:00, 01:00), [00:00, 02:00), [00:00, 03:00), ...,  [00:00, 24:00) for every day.


Here is an example invocation on the Bid table:

Code Block
languagesql
> SELECT * FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
> SELECT * FROM TABLE(
    CUMULATE(
      DATA => TABLE Bid,
      TIMECOL => DESCRIPTOR(bidtime),
      STEP => INTERVAL '2' MINUTES,
      SIZE => INTERVAL '10' MINUTES));

------------------------------------------------------
| bidtime | price | item | window_start | window_end |
------------------------------------------------------
| 8:07    | $2    | A----------------
| bidtime | price | item | window_start | window_end |
------------------------------------------------------
| 8:07    | $2    | A    | 8:00         | 8:08       |
| 8:07    | $2    | A    | 8:00         | 8:10       |
| 8:11    | $3    | B    | 8:10         | 8:12       |
| 8:11    | $3    | B    | 8:10         | 8:14       |
| 8:11    | $3    | B    | 8:1000         | 8:1608       |
| 8:1107    | $3$2    | BA    | 8:1000         | 8:1810       |
| 8:11    | $3    | B    | 8:10         | 8:2012       |
| 8:0511    | $4$3    | CB    | 8:0010         | 8:0614       |
| 8:0511    | $4$3    | CB    | 8:0010         | 8:0816       |
| 8:0511    | $4$3    | CB    | 8:0010         | 8:1018       |
| 8:0911    | $5$3    | DB    | 8:0010         | 8:1020       |
| 8:1305    | $1$4    | EC    | 8:1000         | 8:1406       |
| 8:1305    | $1$4    | EC    | 8:1000         | 8:1608       |
| 8:1305    | $1$4    | EC    | 8:1000         | 8:1810       |
| 8:1309    | $1$5    | ED    | 8:1000         | 8:2010       |
| 8:1713    | $1    | E    | 8:10         | $6    | F    | 8:10         | 8:18       |
| 8:17    | $6    | F    | 8:10         | 8:20       |
------------------------------------------------------
8:14       |
| 8:13    | $1    | E    | 8:10         | 8:16       |
| 8:13    | $1    | E    | 8:10         | 8:18       |
| 8:13    | $1    | E    | 8:10         | 8:20       |
| 8:17    | $6    | F    | 8:10         | 8:18       |
| 8:17    | $6    | F    | 8:10         | 8:20       |
------------------------------------------------------

> SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

--------------------------------------
|  window_start | window_end | price |
--------------------------------------
| 8:00          | 8:06       | $4    |
| 8:00          | 8:08       | $6    |
| 8:00          | 8:10       | $11   |
| 8:10          | 8:12       | $3    |
| 8:10          | 8:14       | $4    |
| 8:10          | 8:16       | $4    |
| 8:10          | 8:18       | $10   |
| 8:10          | 8:20       | $10   |
--------------------------------------

Session Windows

The table-valued function SESSION assigns windows that cover rows based on datetime. Within a session window, distances of rows are less than interval. Session window is applied per key. The return value of SE is a relation that includes all columns of data as well as additional 3 columns named window_start, window_end, window_time to indicate the assigned window. The original row time attribute "timecol" will be a regular timestamp column after window TVF.  

SESSION  takes four required parameters and one optional parameter.

Code Block
languagesql
SESSION(data [PARTITION BY (keycols, ...)], DESCRIPTOR(timecol), gap)
  • data  is a table parameter that can be any relation with an time attribute column.
  • keycols is a column descriptor indicating which columns should be used to partition the data prior to sessionization
  • timecol  is a column descriptor indicating which time attribute column of data should be mapped to tumbling windows.
  • gap is the maximum difference in timestamp for two events to be considered part of the same sessions


Here is an example invocation on the Bid table:

Code Block
languagesql
> SELECT * FROM Bid;

------------------------------------
| bidtime | price | item | bidder  |
------------------------------------
| 8:07    | $2    | A    | takidau |
| 8:05    | $1    | A    | klk     |
| 8:09    | $10   | B    | takidau |
| 8:08    | $3    | A    | klk     |
| 8:17    | $20   | B    | klk     |
------------------------------------

> SELECT *
  FROM TABLE(
    SESSION(TABLE Bid PARTITION BY bidder, DESCRIPTOR(bidtime), DESCRIPTOR(bidder), INTERVAL '5' MINUTES);
-- or with the named params
-- note: the DATA param must be the first
> SELECT *
    FROM SESSION(
      data     => TABLE Bids PARTITION BY bidder,
      timecol  => DESCRIPTOR(bidtime),
      gap      => INTERVAL '5' MINUTES);

----------------------------------------------------------------
| bidtime | price | item | bidder  | window_start | window_end |
----------------------------------------------------------------
| 8:07    | $2    | A    | takidau | 8:07         | 8:14       |
| 8:05    | $1    | A    | klk     | 8:05         | 8:13       |
| 8:09    | $10   | B    | takidau | 8:07         | 8:14       |
| 8:08    | $3    | A    | klk     | 8:05         | 8:13       |
| 8:17    | $20   | B    | klk     | 8:17         | 8:22       |
----------------------------------------------------------------

> SELECT bidder, window_start, window_end, SUM(price)
  FROM TABLE(
    SESSION(TABLE Bid PARTITION BY bidder, DESCRIPTOR(bidtime), DESCRIPTOR(bidder), INTERVAL '5' MINUTES)
  GROUP BY bidder, window_start, window_end;

-----------------------------------------------
| bidder  | window_start | window_end | price |
-----------------------------------------------
| takidau | 8:07         | 8:14       |  $12  |
| klk     | 8:05         | 8:13       |  $4   |
| klk     | 8:17         | 8:22       |  $20  |
-----------------------------------------------

Operations on windows

We also would like to support various operations based on the windowing TVFs. This can make the functionality more complete.

Window Aggregate

The window aggregate requires the group by clause contains start and end columns of the windowing TVF.

SELECT ...
FROM T -- relation applied windowing TVF
GROUP BY window_start, window_end, ...

In the future, we can also simplify the group by clause to only include the start or end column if the windowing TVF is TUMBLE or HOP .

Window Join

The window join requires the join on condition contains window starts equality of input tables and window ends equality of input tables. The semantic of window join is the same to the DataStream window join.

SELECT ...
FROM L [LEFT|RIGHT|FULL OUTER] JOIN R -- L and R are relations applied windowing TVF
ON L.window_start = R.window_start AND L.window_end = R.window_end AND ...

In the future, we can also simplify the join on clause to only include the window start equality if the windowing TVF is TUMBLE or HOP . Currently, the windowing TVFs must be the same of left and right inputs. This can be extended in the future, for example, tumbling windows join sliding windows with the same window size.

Window TopN

The window TopN requires the PARTITION BY clause contains start and end columns of the windowing TVF.

SELECT ...
FROM (
   SELECT *,
     ROW_NUMBER() OVER (PARTITION BY window_start, window_end, ...
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM T) -- relation applied windowing TVF
WHERE rownum <= N [AND conditions]

In the future, we can also simplify the PARTITION BY clause to only include the start or end column if the windowing TVF is TUMBLE or HOP .

Time Attribute Propagate

As we mentioned before, all the 4 kinds of window TVF will generate 3 additional columns: window_start, window_end and window_time which indicate the assigned window. The window_time column is the time attribute of the record after window TVF, it alwsays equal to "window_end - 1". The original row time attribute "timecol" will be a regular timestamp column after applying window TVF. In order to propagate time attribute through window-based operations, for window aggregate, users just need to add the window_time column into GROUP BY clause and select out window_time column. For window join and window TopN, users just need to select out the window_time column and don't need to add them into JOIN ON and PARTITIONED BY clause. The result window_time column is still a time attribute that can be used in subsequent time-based operations such as interval joins and sort or over window aggregations. For example:

Code Block
languagesql
-- window_time needs to be added to GROUP BY and the output window_time is still a time attribute after window aggregation
> SELECT window_start, window_end, window_time, SUM(price)
  FROM TABLE(
    CUMULATETUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end, window_time;

--------------------------------------
|  window_start | window_end | price |
--------------------------------------
| 8:00          | 8:06       | $4    |
| 8:00     L.window_time and R.window_time are still time attributes after window join
> SELECT L.window_time, R.window_time, ...
  FROM windowed_view1 AS L JOIN windowed_view2 AS R
  ON L.user_id = R.user_id AND L.window_start = R.window_start AND L.window_end = R.window_end;

-- window_time is still a time attribute after window TopN
> SELECT window_time, ...
  FROM (
    SELECT *,
      | 8:08       | $6    |
| 8:00          | 8:10       | $11   |
| 8:10          | 8:12       | $3    |
| 8:10     ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_price DESC) as rank
    FROM trd_itm_ord_mm
  ) WHERE rank <= 100;


Example

For example, if we have an orders stream and we want to get some cumulative aggregates from 00:00 to current 10 minutes, including item sales, distinct buyers of each item, distinct buyers of item's seller. Then we want to get the Top100 items which have the highest sale from 00:00 to current 10 minutes.


This can be illustrate in the following complex job, which contains cumulating window, and window aggregate, window join and window TopN.

Code Block
languagesql
CREATE TEMPORARY VIEW trd_itm_ord_mm
SELECT item_id, seller_id, window_start, window_end, sales, item_buyer_cnt, slr_buyer_cnt
FROM (
    SELECT item_id, seller_id, window_start, window_end, SUM(price) as sales, COUNT(DISTINCT user_id) as item_buyer_cnt
    FROM TABLE(
      | 8:14 CUMULATE(TABLE orders, DESCRIPTOR(pay_time), INTERVAL '10' MINUTES, |INTERVAL $4 '1' DAY))
   |
| 8:10 GROUP BY item_id, seller_id, window_start, window_end;
) a 
LEFT JOIN | 8:16       | $4    |
| 8:10   ( -- using window join to enrich the buyer count of the shop from 00:00 to current minute
    SELECT seller_id,  | 8:18   window_start, window_end, COUNT(DISTINCT user_id) as slr_buyer_cnt
    | $10 FROM TABLE(
  |
| 8:10     CUMULATE(TABLE orders, DESCRIPTOR(pay_time), INTERVAL '10' | 8:20  MINUTES, INTERVAL '1' DAY))
    GROUP | $10   |
--------------------------------------

Operations on windows

We also would like to support various operations based on the windowing TVFs. This can make the functionality more complete.

Window Aggregate

The window aggregate requires the group by clause contains start and end columns of the windowing TVF.

...

BY seller_id, window_start, window_end;
) b
ON a.seller_id = b.seller_id AND a.window_start = b.window_start AND a.window_end = b.window_end;


INSERT INTO item_top100_cumulate_10min
SELECT *
FROM (
    SELECT *,
        ROW_NUMBER() OVER (PARTITION BY window_start, window_end

...

In the future, we can also simplify the group by clause to only include the start or end column if the windowing TVF is TUMBLE or HOP .

Window Join

The window join requires the join on condition contains window starts equality of input tables and window ends equality of input tables. The semantic of window join is the same to the DataStream window join.

SELECT ...
FROM L [LEFT|RIGHT] JOIN R -- L and R are relations applied windowing TVF
ON L.window_start = R.window_start AND L.window_end = R.window_end AND ...

In the future, we can also simplify the join on clause to only include the window start equality if the windowing TVF is TUMBLE or HOP . Currently, the windowing TVFs must be the same of left and right inputs. This can be extended in the future, for example, tumbling windows join sliding windows with the same window size.

Window TopN

The window TopN requires the PARTITION BY clause contains start and end columns of the windowing TVF.

SELECT ...
FROM (
   SELECT *,
     ROW_NUMBER() OVER (PARTITION BY window_start, window_end, ...
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM T) -- relation applied windowing TVF
WHERE rownum <= N [AND conditions]

In the future, we can also simplify the PARTITION BY clause to only include the start or end column if the windowing TVF is TUMBLE or HOP .

Example

For example, if we have an orders stream and we want to get some cumulative aggregates from 00:00 to current 10 minutes, including item sales, distinct buyers of each item, distinct buyers of item's seller. Then we want to get the Top100 items which have the highest sale from 00:00 to current 10 minutes.

This can be illustrate in the following complex job, which contains cumulating window, and window aggregate, window join and window TopN.

Code Block
languagesql
CREATE TEMPORARY VIEW trd_itm_ord_mm
SELECT item_id, seller_id, window_start, window_end, sales, item_buyer_cnt, slr_buyer_cnt
FROM (
    SELECT item_id, seller_id, window_start, window_end, SUM(price) as sales, COUNT(DISTINCT user_id) as item_buyer_cnt
    FROM TABLE(
        CUMULATE(TABLE orders, DESCRIPTOR(pay_time), INTERVAL '10' MINUTES, INTERVAL '1' DAY))
    GROUP BY item_id, seller_id, window_start, window_end;
) a 
LEFT JOIN ( -- using window join to enrich the buyer count of the shop from 00:00 to current minute
    SELECT seller_id, window_start, window_end, COUNT(DISTINCT user_id) as slr_buyer_cnt
    FROM TABLE(
        CUMULATE(TABLE orders, DESCRIPTOR(pay_time), INTERVAL '10' MINUTES, INTERVAL '1' DAY))
    GROUP BY seller_id, window_start, window_end;
) b
ON a.seller_id = b.seller_id AND a.window_start = b.window_start AND a.window_end = b.window_end;


INSERT INTO item_top100_cumulate_10min
SELECT *
FROM (
    SELECT *,
        ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_price DESC) as rank
    FROM trd_itm_ord_mm
) WHERE rank <= 100;

Implementation

I will not go deeply into the implementation and optimization details, but will sketch the implementation approach.

  1. We need to bump up Calcite to v1.25+ first.
  2. We will not introduce any new logical nodes as there no new relational algebras.
  3. We will introduce new physical nodes for window join, window aggregate, window rank which works on existing window_start, window_end columns.
  4. We will introdcue a MetadataHandler to infer window start and window end columns. This metadata will be used to convert from logical join/aggregate/rank to physical window join/aggregate/rank.
  5. If the input node of window join/aggregate/rank is the windowing TVF, we can merge the TVF into the wiindow node. So that the window assigning is co-locate in the same operator, this can help us to have better performance by reusing window state, reducing records shuffling. For example, for cumulating window, the state of window [00:00, 01:00) can be used as the base state for window [00:00, 02:00).
  6. We will reuse the existing implementation of WindowOperator (i.e. window aggregate operator).
  7. We will support the mini-batch, local-global, distinct-split optimizations on the window operators to have better performance. Currently, they are only supported in non-window aggregates.

Compatibility, Deprecation, and Migration Plan

...

 ORDER BY total_price DESC) as rank
    FROM trd_itm_ord_mm
) WHERE rank <= 100;

Implementation

I will not go deeply into the implementation and optimization details, but will sketch the implementation approach.

  1. We need to bump up Calcite to v1.25+ first.
  2. We will not introduce any new logical nodes as there no new relational algebras.
  3. We will introduce new physical nodes for window join, window aggregate, window rank which works on existing window_start, window_end columns.
  4. We will introdcue a MetadataHandler to infer window start and window end columns. This metadata will be used to convert from logical join/aggregate/rank to physical window join/aggregate/rank.
  5. If the input node of window join/aggregate/rank is the windowing TVF, we can merge the TVF into the wiindow node. So that the window assigning is co-locate in the same operator, this can help us to have better performance by reusing window state, reducing records shuffling. For example, for cumulating window, the state of window [00:00, 01:00) can be used as the base state for window [00:00, 02:00).
  6. We will reuse the existing implementation of WindowOperator (i.e. window aggregate operator).
  7. We will support the mini-batch, local-global, distinct-split optimizations on the window operators to have better performance. Currently, they are only supported in non-window aggregates.

Performance Optimization

The performance optimization is a long-term plan. As the window operations are based on windowed data which is an infinite sequence of bounded data. The SLA of such jobs are usually the window size. So we can utilize the duration of window size to buffer data, pre-aggregate them, reduce operation on state, and improve throughput. In the long term, the window operation can work in a continuously batch mode. The source can buffer input data until all data of a specific window has arrived (by watermark), then the source emit the window data to the downstream operators. All the downstream operators are batch operators and can be scheduled when each window data is ready and don't need the state mechanism. The downstream operators process the bounded window data and emit window result in a batch fashion. That means we can utilize the existing batch operator implementations and optimizations. This approach would be quite efficient for throughput, because we don't need to access states. However, this is blocked by several runtime building blocks, such as continuously batch mode, the combination of streaming and batch operators, etc...

Therefore, in the short term, we have to use the streaming operators and work in the stream mode. The optimization idea is to buffer batch data as much as possible. Similar to the current mini-batch mechanisim for unbounded aggregates, bufferred batch data can be trigger only when (1) watermark of window end is arrived, or(2) reserved memory for buffering is full, or (3) checkpoint barrier is arrived. We will reserve a pre-defined memory space (similar to batch operator to allocate memory from MemoryManager) for buffering data. The buffered data can be records or pre-aggregated accumulators, this is depends on the operator. Users can tune the configuration of the preserved memory size. In this way, we can aggregate as much data as possible before access states to improve the thoughout. This will be more efficient than the current mini-batch on unbounded aggregates, because we can buffer more data. In the other side, due to we will incrementally calculate input data, instead of calculating the whole window data in one shot, it will take a lot less computation when window is closed. Thus, the latency from window is closed to window result is emit will be shorter, and the cluster CPU load will be more smooth and steady. 

For overlapping windows, e.g. HOP, CUMULATE, we can use the shared pane state to reduce state size and operations. For example, hopping with 1 hour window size and 10 minutes slide size, we can store state for every 10 minutes pane size, this can reduce to store lots of duplicate records for window join. Another example, cumulating with 1 day max window size and 1 hour step size, the state of window [00:00, 01:00) can be used as the base state for window [00:00, 02:00), this can greatly reduce state size and operations.


Compatibility, Deprecation, and Migration Plan

The existing Grouped window functions, i.e. GROUP BY TUMBLE... are still supported, but will be deprecated. We can drop the old syntax at some point in the future, but this needs another public discussion. We should update the examples, docs to only use the new one. The support for Table API is a future work and should be in a separate FLIP.


Future Work

Simplify Polymorphic Table Functions syntax

As disucssed in the mailing list, the current PTF syntax is verbose, it would be great if we can remove TABLE() keyword, that would be easier to be picked up by users. Oracle and SQL Server both don't need such keywords. A simplified query would be like this:


Code Block
languagesql
SELECT *
FROM TUMBLE(inputTable, DESCRIPTOR(timecol), INTERVAL '10' MINUTE);


Support count window with the window TVF

For a long time, count window is supported in Table API, but not supported in SQL. With the new window TVF syntax, we can also introduce a new window function for count window in the future. For example, the following TUMBLE_ROW assigns windows in 10 row-count interval. 

Code Block
languagesql
SELECT *
FROM TABLE(
   TUMBLE_ROW(
     data => TABLE inputTable,
     timecol => DESCRIPTOR(timecol),
     size => 10));


Explore more ability of Polymorphic Table Functions

It's very exciting to see the potential ability of PTF.  As we can see, in the future, maybe we can support the following features in SQL in a standard PTF way.

  • advanced operations supported in Table API (FLIP-29), e.g. drop_columns, user-defined-table-aggregate
  • user defined join operator
  • a shortcut TopN function
  • re-assign watermarks?
  • re-partition data, similar feature to Hive DISTRIBUTED BY syntax. 
  • ...