Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: Released

Discussion thread: https://mail-archives.apache.org/mod_mbox/flink-dev/201609.mbox/%3C4f43ebc4-7c5d-711a-5d9f-3e179bdcd33c%40apache.org%3E

...

Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-4557

Release

...

1.3


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

val res = tab
 .groupBy(‘a) // leave groupBy-Clause out to define non-keyed / global windows
 .window(w: Window)
 .select(‘a, ‘b.sum, ‘w.start, ‘w.end)

 


The Window defines how rows are mapped to windows. Window is not a custom function that users can extend, but only an API object that is internally handled and translated into a corresponding DataStream operation. The initially supported window definitions are listed below. More window definitions, e.g., in order to incorporate the features of the Trigger DSL (FLIP-9), might be added later.

...

  • Tumbling Event-time window:
      .window(Tumble over 10.minutes on ‘rowtime as ‘w)

  • Tumbling Processing-time window:
      .window(Tumble over 10.minutes as ‘w)

  • Tumbling Row-count window: 
     .window(Tumble over 10.rows)

...


over

Mandatory.

Defines the length the window, either by time or row count.

on

Optional. Required for event-time windows.

For streaming tables, on defines the time mode (‘rowtime is a logical system attribute), for batch tables, the time attribute over which the query is evaluated.

as

Optional.

Assigns an alias for the window that the following select() clause can refer to in order to access window properties such as window start or end time.

Slide (Sliding Windows)

  • Sliding Event-time window:
      .window(Slide over 10.minutes every 2.minutes on ‘rowtime as ‘w)

  • Sliding Processing-time window:
      .window(Slide over 10.minutes every 2.minutes as ‘w)

  • Sliding Row-count window:
     .window(Slide over 10.rows every 2.rows)

 


over

Mandatory.

Defines the length the window, either by time or row count.

every

Mandatory.

Defines how frequent a new window is created, either by time or row count. The creation interval must be of the same type as the window length.

on

Optional. Required for event-time windows.

For streaming tables, on defines the time mode (‘rowtime is a logical system attribute), for batch tables, the time attribute over which the query is evaluated.

as

Optional.

Assigns an alias for the window that the following select() clause can refer to in order to access window properties such as window start or end time.

Session (Session Windows)

  • Session Event-time window:
      .window(Session withGap 10.minutes on ‘rowtime as ‘w)

  • Session Processing-time window:
     .window(Session withGap 10.minutes as ‘w)


 

withGap

Mandatory.

Defines the gap between two windows as time interval.

on

Optional. Required for event-time windows.

For streaming tables, on defines the time mode (‘rowtime is a logical system attribute), for batch tables, the time attribute over which the query is evaluated.

as

Optional.

Assigns an alias for the table that the following select() clause can refer to in order to access window properties such as window start or end time.

Row-window Aggregates

Row-window aggregates are evaluated for each row and a row window defines for each row the range of preceding and succeeding rows over which an aggregation function is evaluated. If more than one aggregation function is evaluated for a result row, each aggregation function can be evaluated over a separate row window. Row windows are similar to standard SQL windows (OVER clause).

Row-windows are defined using the using the rowWindow(rw: RowWindow*) method. The rw parameter defines one or more row windows. Each row window must have an alias assigned. Aggregates in the select() method must refer to a RowWindow by providing an alias in the over() clause. The rowWindow() method can be applied to a Table yielding a non-keyed row-window aggregation or to a GroupedTable resulting in a keyed row-window aggregation.

val res = tab
  .groupBy(‘a) // optional, in case of absence non-keyed, non-parallel windows are computed
  .rowWindow(RowWindow as ‘x, RowWindow as ‘y)
  .select(‘a, ‘b.count over ‘x as ‘xcnt, ‘c.count over ‘y as ‘ycnt, ‘x.start, ‘x.end)

Event-time row-windows process rows with increasing timestamps (i.e., rows are sorted on timestamp) because the result of aggregations depends on the order of elements in the window. Hence, only ordered processing will guarantee consistent results.

Similar to the group window definition Window, RowWindow is not an interface that users can implement, but rather a collection of built-in API objects that are internally translated into efficient DataStream operators. There are many different ways how RowWindows can be defined, e.g., time semantics (processing-time, event-time, row-count), window-length (fixed length, variable length), window-start (aligned by time, on element arrival), window-end (aligned by time, time-out), etc.

The first version of row-window aggregates will not support multiple RowWindow definitions.

The Table API will initially provide a few RowWindow definitions and extend the set on user requests. The following RowWindow definitions will be initially provided.

TumbleRows

A TumbleRows row-window can be defined over a time interval or a row-count and have a fixed length. TumbleRows time windows start at fixed time offsets, row count windows start with the first element that arrives. Hence they are very similar to tumbling group-windows with respect to their start and length. What differs is the way that the aggregation function is computed. Aggregates are incrementally computed for each row that arrives and include all rows that precede the current row in the TumbleRows window.

  • Tumbling Event-time row-window:
      .rowWindow(TumbleRows over 10.minutes on ‘rowtime as ‘w)

  • Tumbling Processing-time row-window:
      .rowWindow(TumbleRows over 10.minutes as ‘w)

  • Tumbling Row-count row-window:
     .rowWindow(TumbleRows over 10.rows)

     



 

over

Mandatory.

Defines the length of the window. Can be time interval or row-count. Time windows start at fixed offsets (e.g., every hour), row-count windows start with the first arriving element.

on

Optional. Required for event-time windows.

For streaming tables, on defines the time mode (‘rowtime is a logical system attribute), for batch tables, the time attribute over which the query is evaluated.

as

Mandatory

Assigns an alias for the table that the following select() clause can refer to in order to access window properties such as window start or end time.

 


SlideRows

A SlideRows window is defined over two intervals, a preceding and a succeeding interval. Both intervals can be defined as a time interval or a row-count (both intervals must be of same type). Aggregates are computed for a row (current row) over the valid range of records before and after the current row.

  • Slide Event-time row-window:
      .rowWindow(SlideRows preceding 5.minutes following 5.minutes on ‘rowtime as ‘w)

  • Slide Processing-time row-window:
      .rowWindow(SlideRows preceding 5.minutes following 5.minutes as ‘w)

  • Slide Row-count row-window:
      .rowWindow(SlideRows preceding 5.rows following 5.rows)

  • Slide unbounded-preceding row-window:
     .rowWindow(SlideRows.unboundedPreceding)
     .rowWindow(SlideRows.unboundedPreceding following 5.rows)


 

preceding,

unboundedPreceding

Optional. If not set following() must be set.

Defines the interval of valid rows preceding the current row. Can be time interval or row-count. unboundedPreceding() defines an unbounded interval from the start of the stream, i.e., aggregates are incrementally computed for each row from the “beginning” of the stream.

following

Optional. If not set preceding() must be set.

Defines the interval of valid rows following the current row. Can be time interval or row count.

on

Optional. Required for event-time windows.

For streaming tables, on defines the time mode (‘rowtime is a logical system attribute), for batch tables, the time attribute over which the query is evaluated.

as

Mandatory

Assigns an alias for the table that the following select() clause can refer to in order to access window properties such as window start or end time.

 


SessionRows 

A SessionRows window is started with the first element that arrives (per key or global) and ends when no element (for a key or global) arrives for a certain amount of time. Aggregation functions are incrementally computed for each row that arrives and include all rows that precede the current row in the SessionRows window. After the window has been closed, the current aggregation state is discarded. 

  • Session Event-time row-window:
      .rowWindow(SessionRows withGap 10.minutes on ‘rowtime as ‘w)

  • Session Processing-time row-window:
      .rowWindow(SessionRows withGap 10.minutes as ‘w)


 

withGap

Mandatory.

Defines the time out after which the window is closed and its state discarded.

on

Optional. Required for event-time windows.

For streaming tables, on defines the time mode (‘rowtime is a logical system attributes), for batch tables, the time attribute over which the query is evaluated.

as

Mandatory.

Assigns an alias for the table that the following select() clause can refer to in order to access window properties such as window start or end time.

 


Event-time Timestamps and Watermarks in the Table API

 

Event-time operators require that all events are annotated with a timestamp and that the stream provides watermarks. There are two ways to create a streaming table 1) from an existing DataStream or from a StreamTableSource. In the former case we assume / require that timestamps and watermarks are already assigned to the stream and its records. For the latter case, the StreamTableSource is responsible to assign timestamps and watermarks. Since a StreamTableSource is just an interface with a method that returns a DataStream, each implementation of this interface is responsible to assign timestamps and watermarks in order to make the streaming table processable in event time. Hence, a StreamTableSource should provide a method to allow users to configure a custom timestamp and watermark assigner. At the moment, it is not possible to check if a DataStream provides timestamps and watermarks. A Table API query that includes an event-time operator will fail at execution time if those are not provided (this is the same behavior as for DataStream programs that include event-time operators).

Open Question: Should we extend the StreamTableSource interface (or convert it into an abstract class) to provide methods to assign timestamps and watermarks? If yes, we have another API breaking change. This would also mean that a table source is not immutable anymore.

For streaming tables, we add a keyword “rowtime” which is represented as a symbol in Scala to distinguish the processing mode. This means that the keyword look like a system-provided attribute however it is not part of table’s schema and cannot be accessed elsewhere (e.g., in a select or groupBy clause). Therefore, streaming tables are not allowed to have a “rowtime” column. The benefit of representing these keywords as system columns is that queries with event time operator can be executed on streaming and batch tables if the batch tables feature a long column called rowtime.

...