Discussion thread
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release1.13

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Motivation

The main purpose of this FLIP is to improve the near-real-time (NRT) experience of Flink. We propose to support windowing table-valued function (TVF) syntax as an entry point of NRT use cases. We will explain why we have this decision and the benefits of introducing windowing TVF.

Usually, we can categorize data processing into real-time (in seconds), near real-time (in minutes), and batch (> hours) processing. Flink is a well-known stream processing system which is good at real-time scenarios. Meanwhile, the community has put a lot of effort into strengthening batch processing capabilities. However,

I have heard some users complained that it's expensive to use Flink for NRT use cases. As far as we know, NRT is a very common scenarios. We might have overlooked it before. That's why we want to improve it and make Flink to be a great engine for NRT scenarios.

Goal

What we need to do for that purpose? We investigated many Flink streaming jobs and we found the following pain points.

  • Learning curve. Usually, users use windows for their minutes/seconds-granularity statistics. However, windows is not easy to use in Flink SQL currently. It only supports window aggregate, not support window join, window TopN, window deduplicate. It's hard to cascade different operations (e.g. join, agg), users have to learn how to keep time attribute and some streaming specific functions, e.g. TUMBLE_ROWTIME .
  • Performance. Flink is a native streaming engine, it can provide low latency with the cost of per-record state operation. But users don't need such a low latency in some cases. It would be great if the tolerated delay can be exchanged for a huge increase in throughput.

In the industry, users typically use batch engine and scheduler to build NRT pipelines. We also investigated lots of such jobs and found most of jobs are 15-minutes and cumulative aggregations (aggregations from 0 ~ current minute). For example, the number of cumulative UV at 10:00 represents the total number of UV from 00:00 to 10:00. Therefore, the daily report is a monotonically increasing line like the following picture. Snowflake also provides an example of cumulative window aggregates.

image.png

According to our research, most of such use cases are supported by a batch job and scheduled every 15 minutes. The upstream is a 15-minutes partitioned table with the detailed data in that 15 minutes. The batch job will load all the partitions so far today for aggregating using such filter where ds = '${current_date}' and (hh <'${hour}' or (hh ='${hour}' and mm <= '${minute}')). However, it has following pain points:

  • Repeated calculation. The older partitions will be read and calculated more than once.
  • Latency. The overhead of scheduler increases the delay. The resources is not guaranteed when cluster is busy. Usually, the latency of a 15-minutes job is more than 30 minutes.
  • Unstable output of data. When the resource usage of cluster is in the peak, some partition scheduling tasks are lost.
  • Data drift. Some data of 14:59 event time might be wroten into [15:00, 30:00) partition. Then the aggregation result for [00:00, 15:00) partition is not accurate. Users can read one more partition, but it increase latency again.

Based on the research above, the goal of improving NRT is to provide an easy to use API with good performance and reslove the problems users meet in their batch job + scheduler solution. This can extend the border of Flink SQL and attract users from traditional batch processing.

Windowing TVF is the solution we propose for this goal. We think with new API and improved performance, we can achieve the following advantages and address above pain points.

  • Reduce latency. The stream processing and get rid of the scheduler which can save the scheduling time. Streaming system can continuously process, it can produce output in a more timely manner.
  • Steady output of data. Streaming job is a long running task, the output of windows will never get lost.
  • Solve data drift and out-of-order problem with the watermark mechanism.
  • Reduce the learning curve and support richer operations for windowing. It's easier to propagate the regular window_start and window_end columns than the special time attribute column.

Public Interfaces

The idea of windowing table-valued function is from "One SQL To Rule Them All" which is published & presented at SIGMOD 2019. It leverages the new introduced Polymorphic Table Functions (abbreviated PTF) which are the part of the SQL 2016 standard. The "One SQL" paper illustrates two windowing TVFs: Tumbling and hopping windows. For Flink SQL, we would also like to add a new kind window for cumulative aggregations which is quite common in NRT.

This requires the parser support of Calcite. Fortunately, Calcite v1.25.0 has already supports tumbling and hopping windowing TVF. We need to extend it to support one more window: cumulative window.

Windowing table-valued functions

We would like to propose 4 kinds of window TVF: tumbling, hopping, cumulative, session. The return value of window TVF is a relation that includes all columns of data as well as additional 3 columns named window_start, window_end, window_time to indicate the assigned window. The window_time field is a time attribute of the record after window TVF, it alwsays equal to "window_end - 1"

Tumbling Windows

The table-valued function TUMBLE  assigns a window for each row of a relation based on a time attribute column. The return value of TUMBLE is a relation that includes all columns of data as well as additional 3 columns named window_start, window_end, window_time to indicate the assigned window. The original row time attribute "timecol" will be a regular timestamp column after window TVF. All assigned windows have the same length, and that’s why tumbling sometimes is named as “fixed windowing”.

TUMBLE  that takes three required parameters and one optional parameter:

TUMBLE(data, DESCRIPTOR(timecol), size [, offset ])
  • data  is a table parameter that can be any relation with a time attribute column.
  • timecol is a column descriptor indicating which time attribute column of data should be mapped to tumbling windows.
  • size  is a duration specifying the width of the tumbling windows.
  • offset  (optional) specifies that the tumbling should begin from an instant other than the standard beginning of the epoch.

Here is an example invocation on the Bid table from the paper:

> SELECT * FROM Bid;

--------------------------
| bidtime | price | item |
--------------------------
| 8:07    | $2    | A    |
| 8:11    | $3    | B    |
| 8:05    | $4    | C    |
| 8:09    | $5    | D    |
| 8:13    | $1    | E    |
| 8:17    | $6    | F    |
--------------------------

> SELECT * FROM TABLE(
   TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
> SELECT * FROM TABLE(
   TUMBLE(
     DATA => TABLE Bid,
     TIMECOL => DESCRIPTOR(bidtime),
     SIZE => INTERVAL '10' MINUTES));

------------------------------------------------------
| bidtime | price | item | window_start | window_end |
------------------------------------------------------
| 8:07    | $2    | A    | 8:00         | 8:10       |
| 8:11    | $3    | B    | 8:10         | 8:20       |
| 8:05    | $4    | C    | 8:00         | 8:10       |
| 8:09    | $5    | D    | 8:00         | 8:10       |
| 8:13    | $1    | E    | 8:10         | 8:20       |
| 8:17    | $6    | F    | 8:10         | 8:20       |
------------------------------------------------------

> SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

-------------------------------------
| window_start | window_end | price |
-------------------------------------
| 8:00         | 8:10       | $11   |
| 8:10         | 8:20       | $10   |
-------------------------------------

This is very easy to support cascading window aggregate by using the windowing TVFs. The outer window aggregate just need to GROUP BY window_start and window_end of previous window aggregate. Therefore, we don't need the time attribute auxiliary functions (e.g. TUMBLE_ROWTIME, TUMBLE_PROCTIME).  For example:

SELECT
  window_start,
  window_end,
  sum(cnt)
FROM (
  SELECT
     key,
     window_start,
     window_end,
     count(*) as cnt
  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
  GROUP BY key, window_start, window_end
) GROUP BY window_start, window_end

Hopping Windows

The table-valued function HOP assigns windows that cover rows within the interval of size and shifting every slide based on a timestamp column. The return value of HOP is a relation that includes all columns of data as well as additional 3 columns named window_start, window_end, window_time to indicate the assigned window. The original row time attribute "timecol" will be a regular timestamp column after window TVF. Windows assigned could have overlapping so hopping sometime is named as “sliding windowing”.

HOP  takes four required parameters and one optional parameter.

HOP(data, DESCRIPTOR(timecol), slide, size [, offset ])


  • data  is a table parameter that can be any relation with an time attribute column.
  • timecol  is a column descriptor indicating which time attribute column of data should be mapped to tumbling windows.
  • slide  is a duration specifying the duration between the start of sequential hopping windows
  • size  is a duration specifying the width of the hopping windows.
  • offset  (optional) specifies that the hopping should begin from an instant other than the standard beginning of the epoch.


Here is an example invocation on the Bid table from the paper:

> SELECT * FROM TABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
> SELECT * FROM TABLE(
    HOP(
      DATA => TABLE Bid,
      TIMECOL => DESCRIPTOR(bidtime),
      SLIDE => INTERVAL '5' MINUTES,
      SIZE => INTERVAL '10' MINUTES));

------------------------------------------------------
| bidtime | price | item | window_start | window_end |
------------------------------------------------------
| 8:07    | $2    | A    | 8:00         | 8:10       |
| 8:07    | $2    | A    | 8:05         | 8:15       |
| 8:11    | $3    | B    | 8:05         | 8:15       |
| 8:11    | $3    | B    | 8:10         | 8:20       |
| 8:05    | $4    | C    | 8:00         | 8:10       |
| 8:05    | $4    | C    | 8:05         | 8:15       |
| 8:09    | $5    | D    | 8:00         | 8:10       |
| 8:09    | $5    | D    | 8:05         | 8:15       |
| 8:13    | $1    | E    | 8:05         | 8:15       |
| 8:13    | $1    | E    | 8:10         | 8:20       |
| 8:17    | $6    | F    | 8:10         | 8:20       |
| 8:17    | $6    | F    | 8:15         | 8:25       |
------------------------------------------------------

> SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

--------------------------------------
|  window_start | window_end | price |
--------------------------------------
| 8:00          | 8:10       | $11   |
| 8:05          | 8:15       | $15   |
| 8:10          | 8:20       | $10   |
| 8:15          | 8:25       | $6    |
--------------------------------------

Cumulating Windows

The table-valued function CUMULATE assigns windows that cover rows within an initial interval of step size, and expanding to one more step size (keep window start fixed) every step until to the max window size. The return value of CUMULATE is a relation that includes all columns of data as well as additional 3 columns named window_start, window_end, window_time to indicate the assigned window. The original row time attribute "timecol" will be a regular timestamp column after window TVF. Windows assigned could have overlapping.

CUMULATE  takes four required parameters and one optional parameter.

CUMULATE(data, DESCRIPTOR(timecol), step, size [, offset ])
  • data  is a table parameter that can be any relation with an time attribute column.
  • timecol  is a column descriptor indicating which time attribute column of data should be mapped to tumbling windows.
  • step  is a duration specifying the increased window size between the end of sequential cumulating windows.
  • size  is a duration specifying the max width of the cumulating windows. size must be an integral multiple of step .
  • offset  (optional) specifies that the hopping should begin from an instant other than the standard beginning of the epoch.


So a cumulating window for 1 hour step and 1 day max size will produce wndows: [00:00, 01:00), [00:00, 02:00), [00:00, 03:00), ...,  [00:00, 24:00) for every day.


Here is an example invocation on the Bid table:

> SELECT * FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
> SELECT * FROM TABLE(
    CUMULATE(
      DATA => TABLE Bid,
      TIMECOL => DESCRIPTOR(bidtime),
      STEP => INTERVAL '2' MINUTES,
      SIZE => INTERVAL '10' MINUTES));

------------------------------------------------------
| bidtime | price | item | window_start | window_end |
------------------------------------------------------
| 8:07    | $2    | A    | 8:00         | 8:08       |
| 8:07    | $2    | A    | 8:00         | 8:10       |
| 8:11    | $3    | B    | 8:10         | 8:12       |
| 8:11    | $3    | B    | 8:10         | 8:14       |
| 8:11    | $3    | B    | 8:10         | 8:16       |
| 8:11    | $3    | B    | 8:10         | 8:18       |
| 8:11    | $3    | B    | 8:10         | 8:20       |
| 8:05    | $4    | C    | 8:00         | 8:06       |
| 8:05    | $4    | C    | 8:00         | 8:08       |
| 8:05    | $4    | C    | 8:00         | 8:10       |
| 8:09    | $5    | D    | 8:00         | 8:10       |
| 8:13    | $1    | E    | 8:10         | 8:14       |
| 8:13    | $1    | E    | 8:10         | 8:16       |
| 8:13    | $1    | E    | 8:10         | 8:18       |
| 8:13    | $1    | E    | 8:10         | 8:20       |
| 8:17    | $6    | F    | 8:10         | 8:18       |
| 8:17    | $6    | F    | 8:10         | 8:20       |
------------------------------------------------------

> SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

--------------------------------------
|  window_start | window_end | price |
--------------------------------------
| 8:00          | 8:06       | $4    |
| 8:00          | 8:08       | $6    |
| 8:00          | 8:10       | $11   |
| 8:10          | 8:12       | $3    |
| 8:10          | 8:14       | $4    |
| 8:10          | 8:16       | $4    |
| 8:10          | 8:18       | $10   |
| 8:10          | 8:20       | $10   |
--------------------------------------

Session Windows

The table-valued function SESSION assigns windows that cover rows based on datetime. Within a session window, distances of rows are less than interval. Session window is applied per key. The return value of SE is a relation that includes all columns of data as well as additional 3 columns named window_start, window_end, window_time to indicate the assigned window. The original row time attribute "timecol" will be a regular timestamp column after window TVF.  

SESSION  takes four required parameters and one optional parameter.

SESSION(data [PARTITION BY (keycols, ...)], DESCRIPTOR(timecol), gap)
  • data  is a table parameter that can be any relation with an time attribute column.
  • keycols is a column descriptor indicating which columns should be used to partition the data prior to sessionization
  • timecol  is a column descriptor indicating which time attribute column of data should be mapped to tumbling windows.
  • gap is the maximum difference in timestamp for two events to be considered part of the same sessions


Here is an example invocation on the Bid table:

> SELECT * FROM Bid;

------------------------------------
| bidtime | price | item | bidder  |
------------------------------------
| 8:07    | $2    | A    | takidau |
| 8:05    | $1    | A    | klk     |
| 8:09    | $10   | B    | takidau |
| 8:08    | $3    | A    | klk     |
| 8:17    | $20   | B    | klk     |
------------------------------------

> SELECT *
  FROM TABLE(
    SESSION(TABLE Bid PARTITION BY bidder, DESCRIPTOR(bidtime), DESCRIPTOR(bidder), INTERVAL '5' MINUTES);
-- or with the named params
-- note: the DATA param must be the first
> SELECT *
    FROM SESSION(
      data     => TABLE Bids PARTITION BY bidder,
      timecol  => DESCRIPTOR(bidtime),
      gap      => INTERVAL '5' MINUTES);

----------------------------------------------------------------
| bidtime | price | item | bidder  | window_start | window_end |
----------------------------------------------------------------
| 8:07    | $2    | A    | takidau | 8:07         | 8:14       |
| 8:05    | $1    | A    | klk     | 8:05         | 8:13       |
| 8:09    | $10   | B    | takidau | 8:07         | 8:14       |
| 8:08    | $3    | A    | klk     | 8:05         | 8:13       |
| 8:17    | $20   | B    | klk     | 8:17         | 8:22       |
----------------------------------------------------------------

> SELECT bidder, window_start, window_end, SUM(price)
  FROM TABLE(
    SESSION(TABLE Bid PARTITION BY bidder, DESCRIPTOR(bidtime), DESCRIPTOR(bidder), INTERVAL '5' MINUTES)
  GROUP BY bidder, window_start, window_end;

-----------------------------------------------
| bidder  | window_start | window_end | price |
-----------------------------------------------
| takidau | 8:07         | 8:14       |  $12  |
| klk     | 8:05         | 8:13       |  $4   |
| klk     | 8:17         | 8:22       |  $20  |
-----------------------------------------------

Operations on windows

We also would like to support various operations based on the windowing TVFs. This can make the functionality more complete.

Window Aggregate

The window aggregate requires the group by clause contains start and end columns of the windowing TVF.

SELECT ...
FROM T -- relation applied windowing TVF
GROUP BY window_start, window_end, ...

In the future, we can also simplify the group by clause to only include the start or end column if the windowing TVF is TUMBLE or HOP .

Window Join

The window join requires the join on condition contains window starts equality of input tables and window ends equality of input tables. The semantic of window join is the same to the DataStream window join.

SELECT ...
FROM L [LEFT|RIGHT|FULL OUTER] JOIN R -- L and R are relations applied windowing TVF
ON L.window_start = R.window_start AND L.window_end = R.window_end AND ...

In the future, we can also simplify the join on clause to only include the window start equality if the windowing TVF is TUMBLE or HOP . Currently, the windowing TVFs must be the same of left and right inputs. This can be extended in the future, for example, tumbling windows join sliding windows with the same window size.

Window TopN

The window TopN requires the PARTITION BY clause contains start and end columns of the windowing TVF.

SELECT ...
FROM (
   SELECT *,
     ROW_NUMBER() OVER (PARTITION BY window_start, window_end, ...
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM T) -- relation applied windowing TVF
WHERE rownum <= N [AND conditions]

In the future, we can also simplify the PARTITION BY clause to only include the start or end column if the windowing TVF is TUMBLE or HOP .

Time Attribute Propagate

As we mentioned before, all the 4 kinds of window TVF will generate 3 additional columns: window_start, window_end and window_time which indicate the assigned window. The window_time column is the time attribute of the record after window TVF, it alwsays equal to "window_end - 1". The original row time attribute "timecol" will be a regular timestamp column after applying window TVF. In order to propagate time attribute through window-based operations, for window aggregate, users just need to add the window_time column into GROUP BY clause and select out window_time column. For window join and window TopN, users just need to select out the window_time column and don't need to add them into JOIN ON and PARTITIONED BY clause. The result window_time column is still a time attribute that can be used in subsequent time-based operations such as interval joins and sort or over window aggregations. For example:

-- window_time needs to be added to GROUP BY and the output window_time is still a time attribute after window aggregation
> SELECT window_start, window_end, window_time, SUM(price)
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end, window_time;

-- L.window_time and R.window_time are still time attributes after window join
> SELECT L.window_time, R.window_time, ...
  FROM windowed_view1 AS L JOIN windowed_view2 AS R
  ON L.user_id = R.user_id AND L.window_start = R.window_start AND L.window_end = R.window_end;

-- window_time is still a time attribute after window TopN
> SELECT window_time, ...
  FROM (
    SELECT *,
        ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_price DESC) as rank
    FROM trd_itm_ord_mm
  ) WHERE rank <= 100;


Example

For example, if we have an orders stream and we want to get some cumulative aggregates from 00:00 to current 10 minutes, including item sales, distinct buyers of each item, distinct buyers of item's seller. Then we want to get the Top100 items which have the highest sale from 00:00 to current 10 minutes.


This can be illustrate in the following complex job, which contains cumulating window, and window aggregate, window join and window TopN.

CREATE TEMPORARY VIEW trd_itm_ord_mm
SELECT item_id, seller_id, window_start, window_end, sales, item_buyer_cnt, slr_buyer_cnt
FROM (
    SELECT item_id, seller_id, window_start, window_end, SUM(price) as sales, COUNT(DISTINCT user_id) as item_buyer_cnt
    FROM TABLE(
        CUMULATE(TABLE orders, DESCRIPTOR(pay_time), INTERVAL '10' MINUTES, INTERVAL '1' DAY))
    GROUP BY item_id, seller_id, window_start, window_end;
) a 
LEFT JOIN ( -- using window join to enrich the buyer count of the shop from 00:00 to current minute
    SELECT seller_id, window_start, window_end, COUNT(DISTINCT user_id) as slr_buyer_cnt
    FROM TABLE(
        CUMULATE(TABLE orders, DESCRIPTOR(pay_time), INTERVAL '10' MINUTES, INTERVAL '1' DAY))
    GROUP BY seller_id, window_start, window_end;
) b
ON a.seller_id = b.seller_id AND a.window_start = b.window_start AND a.window_end = b.window_end;


INSERT INTO item_top100_cumulate_10min
SELECT *
FROM (
    SELECT *,
        ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_price DESC) as rank
    FROM trd_itm_ord_mm
) WHERE rank <= 100;

Implementation

I will not go deeply into the implementation and optimization details, but will sketch the implementation approach.

  1. We need to bump up Calcite to v1.25+ first.
  2. We will not introduce any new logical nodes as there no new relational algebras.
  3. We will introduce new physical nodes for window join, window aggregate, window rank which works on existing window_start, window_end columns.
  4. We will introdcue a MetadataHandler to infer window start and window end columns. This metadata will be used to convert from logical join/aggregate/rank to physical window join/aggregate/rank.
  5. If the input node of window join/aggregate/rank is the windowing TVF, we can merge the TVF into the wiindow node. So that the window assigning is co-locate in the same operator, this can help us to have better performance by reusing window state, reducing records shuffling. For example, for cumulating window, the state of window [00:00, 01:00) can be used as the base state for window [00:00, 02:00).
  6. We will reuse the existing implementation of WindowOperator (i.e. window aggregate operator).
  7. We will support the mini-batch, local-global, distinct-split optimizations on the window operators to have better performance. Currently, they are only supported in non-window aggregates.

Performance Optimization

The performance optimization is a long-term plan. As the window operations are based on windowed data which is an infinite sequence of bounded data. The SLA of such jobs are usually the window size. So we can utilize the duration of window size to buffer data, pre-aggregate them, reduce operation on state, and improve throughput. In the long term, the window operation can work in a continuously batch mode. The source can buffer input data until all data of a specific window has arrived (by watermark), then the source emit the window data to the downstream operators. All the downstream operators are batch operators and can be scheduled when each window data is ready and don't need the state mechanism. The downstream operators process the bounded window data and emit window result in a batch fashion. That means we can utilize the existing batch operator implementations and optimizations. This approach would be quite efficient for throughput, because we don't need to access states. However, this is blocked by several runtime building blocks, such as continuously batch mode, the combination of streaming and batch operators, etc...

Therefore, in the short term, we have to use the streaming operators and work in the stream mode. The optimization idea is to buffer batch data as much as possible. Similar to the current mini-batch mechanisim for unbounded aggregates, bufferred batch data can be trigger only when (1) watermark of window end is arrived, or(2) reserved memory for buffering is full, or (3) checkpoint barrier is arrived. We will reserve a pre-defined memory space (similar to batch operator to allocate memory from MemoryManager) for buffering data. The buffered data can be records or pre-aggregated accumulators, this is depends on the operator. Users can tune the configuration of the preserved memory size. In this way, we can aggregate as much data as possible before access states to improve the thoughout. This will be more efficient than the current mini-batch on unbounded aggregates, because we can buffer more data. In the other side, due to we will incrementally calculate input data, instead of calculating the whole window data in one shot, it will take a lot less computation when window is closed. Thus, the latency from window is closed to window result is emit will be shorter, and the cluster CPU load will be more smooth and steady. 

For overlapping windows, e.g. HOP, CUMULATE, we can use the shared pane state to reduce state size and operations. For example, hopping with 1 hour window size and 10 minutes slide size, we can store state for every 10 minutes pane size, this can reduce to store lots of duplicate records for window join. Another example, cumulating with 1 day max window size and 1 hour step size, the state of window [00:00, 01:00) can be used as the base state for window [00:00, 02:00), this can greatly reduce state size and operations.


Compatibility, Deprecation, and Migration Plan

The existing Grouped window functions, i.e. GROUP BY TUMBLE... are still supported, but will be deprecated. We can drop the old syntax at some point in the future, but this needs another public discussion. We should update the examples, docs to only use the new one. The support for Table API is a future work and should be in a separate FLIP.


Future Work

Simplify Polymorphic Table Functions syntax

As disucssed in the mailing list, the current PTF syntax is verbose, it would be great if we can remove TABLE() keyword, that would be easier to be picked up by users. Oracle and SQL Server both don't need such keywords. A simplified query would be like this:


SELECT *
FROM TUMBLE(inputTable, DESCRIPTOR(timecol), INTERVAL '10' MINUTE);


Support count window with the window TVF

For a long time, count window is supported in Table API, but not supported in SQL. With the new window TVF syntax, we can also introduce a new window function for count window in the future. For example, the following TUMBLE_ROW assigns windows in 10 row-count interval. 

SELECT *
FROM TABLE(
   TUMBLE_ROW(
     data => TABLE inputTable,
     timecol => DESCRIPTOR(timecol),
     size => 10));


Explore more ability of Polymorphic Table Functions

It's very exciting to see the potential ability of PTF.  As we can see, in the future, maybe we can support the following features in SQL in a standard PTF way.

  • advanced operations supported in Table API (FLIP-29), e.g. drop_columns, user-defined-table-aggregate
  • user defined join operator
  • a shortcut TopN function
  • re-assign watermarks?
  • re-partition data, similar feature to Hive DISTRIBUTED BY syntax. 
  • ...