Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The table-valued function TUMBLE  assigns a window for each row of a relation based on a time attribute column. The return value of TUMBLE is a relation that includes all columns of data as well as additional two columns named window_start and window_end which  and window_time which indicate the assigned window. The window_time column is the time attribute of the record after window TVF, it alwsays equal to "window_end - 1". The original row time attribute "timecol" will be a regular timestamp column after window TVF. All assigned windows have the same length, and that’s why tumbling sometimes is named as “fixed windowing”.

...

The table-valued function HOP assigns windows that cover rows within the interval of size and shifting every slide based on a timestamp column. The return value of HOP is a relation that includes all columns of data as well as additional two columns named window_start and window_end and window_time which indicate the assigned window. The window_time column is the time attribute of the record after window TVF, it alwsays equal to "window_end - 1". The original row time attribute "timecol" will be a regular timestamp column after window TVF. Windows assigned could have overlapping so hopping sometime is named as “sliding windowing”.

...

The table-valued function CUMULATE assigns windows that cover rows within an initial interval of step size, and expanding to one more step size (keep window start fixed) every step until to the max window size. The return value of CUMULATE  is a relation that includes all columns of data as well as additional two columns named window_start and window_end and window_time which indicate the assigned window. The window_time column is the time attribute of the record after window TVF, it alwsays equal to "window_end - 1". The original row time attribute "timecol" will be a regular timestamp column after window TVF. Windows assigned could have overlapping.

...

The table-valued function SESSION assigns windows that cover rows based on datetime. Within a session window, distances of rows are less than interval. Session window is applied per key. The returned value of SESSION  is a relation that includes all columns of data as well as additional two three columns named window_start and window_end and window_time which indicate the assigned window. The window_time column is the time attribute of the record after window TVF, it alwsays equal to "window_end - 1". The original row time attribute "timecol" will be a regular timestamp column after window TVF.  

SESSION  takes four required parameters and one optional parameter.

...

In the future, we can also simplify the PARTITION BY clause to only include the start or end column if the windowing TVF is TUMBLE or HOP .

Example

For example, if we have an orders stream and we want to get some cumulative aggregates from 00:00 to current 10 minutes, including item sales, distinct buyers of each item, distinct buyers of item's seller. Then we want to get the Top100 items which have the highest sale from 00:00 to current 10 minutes.

This can be illustrate in the following complex job, which contains cumulating window, and window aggregate, window join and window TopN.

Time Attribute Propagate

As we mentioned before, all the 4 kinds of window TVF will generate 3 additional columns: window_start, window_end and window_time which indicate the assigned window. The window_time column is the time attribute of the record after window TVF, it alwsays equal to "window_end - 1". The original row time attribute "timecol" will be a regular timestamp column after applying window TVF. In order to propagate time attribute through window-based operations, for window aggregate, users just need to add the window_time column into GROUP BY clause and select out window_time column. For window join and window TopN, users just need to select out the window_time column and don't need to add them into JOIN ON and PARTITIONED BY clause. The result window_time column is still a time attribute that can be used in subsequent time-based operations such as interval joins and sort or over window aggregations. For example:

Code Block
languagesql
-- window_time needs to be added to GROUP BY and the output window_time is still a time attribute after window aggregation
> SELECT window_start, window_end, window_time, SUM(price)
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end, window_time;

-- L.window_time and R.window_time are still time attributes after window join
> SELECT L.window_time, R.window_time, ...
  FROM windowed_view1 AS L JOIN windowed_view2 AS R
  ON L.user_id = R.user_id AND L.window_start = R.window_start AND L.window_end = R.window_end;

-- window_time is still a time attribute after window TopN
> SELECT window_time, ...
  
Code Block
languagesql
CREATE TEMPORARY VIEW trd_itm_ord_mm
SELECT item_id, seller_id, window_start, window_end, sales, item_buyer_cnt, slr_buyer_cnt
FROM (
    SELECT item_id, seller_id, window_start, window_end, SUM(price) as sales, COUNT(DISTINCT user_id) as item_buyer_cnt
    FROM TABLE(
        CUMULATE(TABLE orders, DESCRIPTOR(pay_time), INTERVAL '10' MINUTES, INTERVAL '1' DAY))
    GROUP BY item_id, seller_id, window_start, window_end;
) a 
LEFT JOIN ( -- using window join to enrich the buyer count of the shop from 00:00 to current minute
    SELECT seller_id, window_start, window_end, COUNT(DISTINCT user_id) as slr_buyer_cnt
    FROM TABLE(
        CUMULATE(TABLE orders, DESCRIPTOR(pay_time), INTERVAL '10' MINUTES, INTERVAL '1' DAY))
    GROUP BY seller_id, window_start, window_end;
) b
ON a.seller_id = b.seller_id AND a.window_start = b.window_start AND a.window_end = b.window_end;


INSERT INTO item_top100_cumulate_10min
SELECT *
FROM (
    SELECT *,
        ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_price DESC) as rank
    FROM trd_itm_ord_mm
  ) WHERE rank <= 100;

Implementation

I will not go deeply into the implementation and optimization details, but will sketch the implementation approach.

  1. We need to bump up Calcite to v1.25+ first.
  2. We will not introduce any new logical nodes as there no new relational algebras.
  3. We will introduce new physical nodes for window join, window aggregate, window rank which works on existing window_start, window_end columns.
  4. We will introdcue a MetadataHandler to infer window start and window end columns. This metadata will be used to convert from logical join/aggregate/rank to physical window join/aggregate/rank.
  5. If the input node of window join/aggregate/rank is the windowing TVF, we can merge the TVF into the wiindow node. So that the window assigning is co-locate in the same operator, this can help us to have better performance by reusing window state, reducing records shuffling. For example, for cumulating window, the state of window [00:00, 01:00) can be used as the base state for window [00:00, 02:00).
  6. We will reuse the existing implementation of WindowOperator (i.e. window aggregate operator).
  7. We will support the mini-batch, local-global, distinct-split optimizations on the window operators to have better performance. Currently, they are only supported in non-window aggregates.

Performance Optimization


Example

For example, if we have an orders stream and we want to get some cumulative aggregates from 00:00 to current 10 minutes, including item sales, distinct buyers of each item, distinct buyers of item's seller. Then we want to get the Top100 items which have the highest sale from 00:00 to current 10 minutes.


This can be illustrate in the following complex job, which contains cumulating window, and window aggregate, window join and window TopN.

Code Block
languagesql
CREATE TEMPORARY VIEW trd_itm_ord_mm
SELECT item_id, seller_id, window_start, window_end, sales, item_buyer_cnt, slr_buyer_cnt
FROM (
    SELECT item_id, seller_id, window_start, window_end, SUM(price) as sales, COUNT(DISTINCT user_id) as item_buyer_cnt
    FROM TABLE(
        CUMULATE(TABLE orders, DESCRIPTOR(pay_time), INTERVAL '10' MINUTES, INTERVAL '1' DAY))
    GROUP BY item_id, seller_id, window_start, window_end;
) a 
LEFT JOIN ( -- using window join to enrich the buyer count of the shop from 00:00 to current minute
    SELECT seller_id, window_start, window_end, COUNT(DISTINCT user_id) as slr_buyer_cnt
    FROM TABLE(
        CUMULATE(TABLE orders, DESCRIPTOR(pay_time), INTERVAL '10' MINUTES, INTERVAL '1' DAY))
    GROUP BY seller_id, window_start, window_end;
) b
ON a.seller_id = b.seller_id AND a.window_start = b.window_start AND a.window_end = b.window_end;


INSERT INTO item_top100_cumulate_10min
SELECT *
FROM (
    SELECT *,
        ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_price DESC) as rank
    FROM trd_itm_ord_mm
) WHERE rank <= 100;

Implementation

I will not go deeply into the implementation and optimization details, but will sketch the implementation approach.

  1. We need to bump up Calcite to v1.25+ first.
  2. We will not introduce any new logical nodes as there no new relational algebras.
  3. We will introduce new physical nodes for window join, window aggregate, window rank which works on existing window_start, window_end columns.
  4. We will introdcue a MetadataHandler to infer window start and window end columns. This metadata will be used to convert from logical join/aggregate/rank to physical window join/aggregate/rank.
  5. If the input node of window join/aggregate/rank is the windowing TVF, we can merge the TVF into the wiindow node. So that the window assigning is co-locate in the same operator, this can help us to have better performance by reusing window state, reducing records shuffling. For example, for cumulating window, the state of window [00:00, 01:00) can be used as the base state for window [00:00, 02:00).
  6. We will reuse the existing implementation of WindowOperator (i.e. window aggregate operator).
  7. We will support the mini-batch, local-global, distinct-split optimizations on the window operators to have better performance. Currently, they are only supported in non-window aggregates.

Performance Optimization

The performance optimization is a long-term plan. As the window operations are based on windowed data which is an infinite sequence of bounded data. The SLA of such jobs are usually the window size. So we can utilize the duration of window size to buffer data, pre-aggregate them, reduce operation on state, and improve throughput. In the long term, the window operation can work in a continuously batch mode. The source can buffer input data until all data of a specific window has arrived (by watermark), then the source emit the window data to the downstream operators. All the downstream operators are batch operators The performance optimization is a long-term plan. As the window operations are based on windowed data which is an infinite sequence of bounded data. The SLA of such jobs are usually the window size. So we can utilize the duration of window size to buffer data, pre-aggregate them, reduce operation on state, and improve throughput. In the long term, the window operation can work in a continuously batch mode. The source can buffer input data until all data of a specific window has arrived (by watermark), then the source emit the window data to the downstream operators. All the downstream operators are batch operators and can be scheduled when each window data is ready and don't need the state mechanism. The downstream operators process the bounded window data and emit window result in a batch fashion. That means we can utilize the existing batch operator implementations and optimizations. This approach would be quite efficient for throughput, because we don't need to access states. However, this is blocked by several runtime building blocks, such as continuously batch mode, the combination of streaming and batch operators, etc...

...

The existing Grouped window functions, i.e. GROUP BY TUMBLE... are still supported, but will be deprecated. We can drop the old syntax at some point in the future, but this needs another public discussion. We should update the examples, docs to only use the new one. The support for Table API is a future work and should be in a separate FLIP.for Table API is a future work and should be in a separate FLIP.


Future Work

Support count window with the window TVF

For a long time, count window is supported in Table API, but not supported in SQL. With the new window TVF syntax, we can also introduce a new window function for count window in the future. For example, the following TUMBLE_ROW assigns windows in 10 row-count interval. 

Code Block
languagesql
SELECT *
FROM TABLE(
   TUMBLE_ROW(
     data => TABLE inputTable,
     timecol => DESCRIPTOR(timecol),
     size => 10));