Status
Current state: "Completed" Released
Discussion thread: https://mail-archives.apache.org/mod_mbox/flink-dev/201609.mbox/%3C4f43ebc4-7c5d-711a-5d9f-3e179bdcd33c%40apache.org%3E
JIRA:
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Released: -Flink 1.3
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
| Mandatory. | Defines the length the window, either by time or row count. |
| Optional. Required for event-time windows. | For streaming tables, |
| Optional. | Assigns an alias for the window that the following |
Slide (Sliding Windows)
Sliding Event-time window:
.window(Slide over 10.minutes every 2.minutes on ‘rowtime as ‘w)
Sliding Processing-time window:
.window(Slide over 10.minutes every 2.minutes as ‘w)
Sliding Row-count window:
.window(Slide over 10.rows every 2.rows)
| Mandatory. | Defines the length the window, either by time or row count. |
| Mandatory. | Defines how frequent a new window is created, either by time or row count. The creation interval must be of the same type as the window length. |
| Optional. Required for event-time windows. | For streaming tables, on defines the time mode ( |
| Optional. | Assigns an alias for the window that the following |
Session (Session Windows)
Session Event-time window:
.window(Session withGap 10.minutes on ‘rowtime as ‘w)
Session Processing-time window:
.window(Session withGap 10.minutes as ‘w)
| Mandatory. | Defines the gap between two windows as time interval. |
| Optional. Required for event-time windows. | For streaming tables, on defines the time mode ( |
| Optional. | Assigns an alias for the table that the following |
Row-window Aggregates
Row-window aggregates are evaluated for each row and a row window defines for each row the range of preceding and succeeding rows over which an aggregation function is evaluated. If more than one aggregation function is evaluated for a result row, each aggregation function can be evaluated over a separate row window. Row windows are similar to standard SQL windows (OVER
clause).
Row-windows are defined using the using the rowWindow(rw: RowWindow*)
method. The rw
parameter defines one or more row windows. Each row window must have an alias assigned. Aggregates in the select()
method must refer to a RowWindow
by providing an alias in the over()
clause. The rowWindow()
method can be applied to a Table
yielding a non-keyed row-window aggregation or to a GroupedTable
resulting in a keyed row-window aggregation.
val res = tab
.groupBy(‘a) // optional, in case of absence non-keyed, non-parallel windows are computed
.rowWindow(RowWindow as ‘x, RowWindow as ‘y)
.select(‘a, ‘b.count over ‘x as ‘xcnt, ‘c.count over ‘y as ‘ycnt, ‘x.start, ‘x.end)
Event-time row-windows process rows with increasing timestamps (i.e., rows are sorted on timestamp) because the result of aggregations depends on the order of elements in the window. Hence, only ordered processing will guarantee consistent results.
Similar to the group window definition Window
, RowWindow
is not an interface that users can implement, but rather a collection of built-in API objects that are internally translated into efficient DataStream
operators. There are many different ways how RowWindows
can be defined, e.g., time semantics (processing-time, event-time, row-count), window-length (fixed length, variable length), window-start (aligned by time, on element arrival), window-end (aligned by time, time-out), etc.
The first version of row-window aggregates will not support multiple RowWindow
definitions.
The Table API will initially provide a few RowWindow
definitions and extend the set on user requests. The following RowWindow
definitions will be initially provided.
TumbleRows
A TumbleRows
row-window can be defined over a time interval or a row-count and have a fixed length. TumbleRows
time windows start at fixed time offsets, row count windows start with the first element that arrives. Hence they are very similar to tumbling group-windows with respect to their start and length. What differs is the way that the aggregation function is computed. Aggregates are incrementally computed for each row that arrives and include all rows that precede the current row in the TumbleRows
window.
Tumbling Event-time row-window:
.rowWindow(TumbleRows over 10.minutes on ‘rowtime as ‘w)
Tumbling Processing-time row-window:
.rowWindow(TumbleRows over 10.minutes as ‘w)
Tumbling Row-count row-window:
.rowWindow(TumbleRows over 10.rows)
| Mandatory. | Defines the length of the window. Can be time interval or row-count. Time windows start at fixed offsets (e.g., every hour), row-count windows start with the first arriving element. |
| Optional. Required for event-time windows. | For streaming tables, on defines the time mode ( |
| Mandatory | Assigns an alias for the table that the following |
SlideRows
A SlideRows
window is defined over two intervals, a preceding and a succeeding interval. Both intervals can be defined as a time interval or a row-count (both intervals must be of same type). Aggregates are computed for a row (current row) over the valid range of records before and after the current row.
Slide Event-time row-window:
.rowWindow(SlideRows preceding 5.minutes following 5.minutes on ‘rowtime as ‘w)
Slide Processing-time row-window:
.rowWindow(SlideRows preceding 5.minutes following 5.minutes as ‘w)
Slide Row-count row-window:
.rowWindow(SlideRows preceding 5.rows following 5.rows)
Slide unbounded-preceding row-window:
.rowWindow(SlideRows.unboundedPreceding)
.rowWindow(SlideRows.unboundedPreceding following 5.rows)
| Optional. If not set | Defines the interval of valid rows preceding the current row. Can be time interval or row-count. |
| Optional. If not set | Defines the interval of valid rows following the current row. Can be time interval or row count. |
| Optional. Required for event-time windows. | For streaming tables, on defines the time mode ( |
| Mandatory | Assigns an alias for the table that the following |
SessionRows
A SessionRows
window is started with the first element that arrives (per key or global) and ends when no element (for a key or global) arrives for a certain amount of time. Aggregation functions are incrementally computed for each row that arrives and include all rows that precede the current row in the SessionRows
window. After the window has been closed, the current aggregation state is discarded.
Session Event-time row-window:
.rowWindow(SessionRows withGap 10.minutes on ‘rowtime as ‘w)
Session Processing-time row-window:
.rowWindow(SessionRows withGap 10.minutes as ‘w)
| Mandatory. | Defines the time out after which the window is closed and its state discarded. |
| Optional. Required for event-time windows. | For streaming tables, on defines the time mode ( |
| Mandatory. | Assigns an alias for the table that the following |
Event-time Timestamps and Watermarks in the Table API
Event-time operators require that all events are annotated with a timestamp and that the stream provides watermarks. There are two ways to create a streaming table 1) from an existing DataStream
or from a StreamTableSource
. In the former case we assume / require that timestamps and watermarks are already assigned to the stream and its records. For the latter case, the StreamTableSource
is responsible to assign timestamps and watermarks. Since a StreamTableSource
is just an interface with a method that returns a DataStream
, each implementation of this interface is responsible to assign timestamps and watermarks in order to make the streaming table processable in event time. Hence, a StreamTableSource
should provide a method to allow users to configure a custom timestamp and watermark assigner. At the moment, it is not possible to check if a DataStream
provides timestamps and watermarks. A Table API query that includes an event-time operator will fail at execution time if those are not provided (this is the same behavior as for DataStream
programs that include event-time operators).
Open Question: Should we extend the StreamTableSource
interface (or convert it into an abstract class) to provide methods to assign timestamps and watermarks? If yes, we have another API breaking change. This would also mean that a table source is not immutable anymore.
For streaming tables, we add a keyword “rowtime
” which is represented as a symbol in Scala to distinguish the processing mode. This means that the keyword look like a system-provided attribute however it is not part of table’s schema and cannot be accessed elsewhere (e.g., in a select
or groupBy
clause). Therefore, streaming tables are not allowed to have a “rowtime
” column. The benefit of representing these keywords as system columns is that queries with event time operator can be executed on streaming and batch tables if the batch tables feature a long column called rowtime
.
...