Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Page properties


Discussion thread
Vote thread
JIRA

 

Status

Current state"Under Discussion"

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

...

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-4557

Release

...

1.

...

3


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

val res = tab
 .groupBy(‘a) // leave groupBy-Clause out to define non-keyed / global windows
 .window(w: Window)
 .select(‘a, ‘b.sum, ‘w.start, ‘w.end)

...


The Window defines how rows are mapped to windows. Window is not a custom function that users can extend, but only an API object that is internally handled and translated into a corresponding DataStream operation. The initially supported window definitions are listed below. More window definitions, e.g., in order to incorporate the features of the Trigger DSL (FLIP-9), might be added later.

...

  • Tumbling Event-time window:
      .window(Tumble over 10.minutes on ‘rowtime as ‘w)

  • Tumbling Processing-time window:
      .window(Tumble over 10.minutes on ‘systemtime as ‘w)

  • Tumbling Row-count window: 
     .window(Tumble over 10.rows)

...


over

Mandatory.

Defines the length the window, either by time or row count.

on

Mandatory

Optional. Required for event-time windows.

For streaming tables, on defines the time mode (‘rowtime

and ‘systemtime are

is a logical system

attributes

attribute), for batch tables, the time attribute over which the query is

evaluated.

allowLateness

Only valid for event time windows. Optional.

Defines when a window may be

evaluated.

as

Only valid for time windows.

Optional.

Assigns an alias for the window that the following select() clause can refer to in order to access window properties such as window start or end time.

Slide (Sliding Windows)

  • Sliding Event-time window:
      .window(Slide over 10.minutes every 2.minutes on ‘rowtime as ‘w)

  • Sliding Processing-time window:
      .window(Slide over 10.minutes every 2.minutes on ‘systemtime as ‘w)

  • Sliding Row-count window:
     .window(Slide over 10.rows every 2.rows)

 


over

Mandatory.

Defines the length the window, either by time or row count.

every

Mandatory.

Defines how frequent a new window is created, either by time or row count. The creation interval must be of the same type as the window length.

on

Mandatory

Optional. Required for event-time windows.

For streaming tables, on defines the time mode (‘rowtime

and ‘systemtime are

is a logical system

attributes

attribute), for batch tables, the time attribute over which the query is evaluated.

allowLateness

Only valid for event time windows. Optional.

Defines when a window may be evaluated.

as

Only valid for time windows. Optional.

Assigns an

as

Optional.

Assigns an alias for the window that the following select() clause can refer to in order to access window properties such as window start or end time.

Session (Session Windows)

  • Session Event-time window:
      .window(Session withGap 10.minutes on ‘rowtime as ‘w)

  • Session Processing-time window:
     .window(Session withGap 10.minutes on ‘systemtime as ‘w)

 


withGap

Mandatory.

Defines the gap between two windows as time interval.

on

Mandatory

Optional. Required for event-time windows.

For streaming tables, on defines the time mode (‘rowtime

and ‘systemtime are

is a logical system

attributes

attribute), for batch tables, the time attribute over which the query is evaluated.

allowLateness

Only valid for event time windows. Optional.

Defines when a window may be evaluated.

as

Optional.

Assigns an alias for the table that the following select() clause can refer to in order to access window properties such as window start or end time.

Row-window Aggregates

Row-window aggregates are evaluated for each row and a row window defines for each row the range of preceding and succeeding rows over which an aggregation function is evaluated. If more than one aggregation function is evaluated for a result row, each aggregation function can be evaluated over a separate row window. Row windows are similar to standard SQL windows (OVER clause).

Row-windows are defined using the using the rowWindow(rw: RowWindow*) method. The rw parameter defines one or more row windows. If more than one Each row window is defined, row windows must have an alias assigned. Aggregates in the select() method can must refer to a RowWindow by providing an alias in the over() clause. The rowWindow() method can be applied to a Table yielding a non-keyed row-window aggregation or to a GroupedTable resulting in a keyed row-window aggregation.

val res = tab
  .groupBy(‘a) // optional, in case of absence non-keyed, non-parallel windows are computed
  .rowWindow(RowWindow as ‘x, RowWindow as ‘y)
  .select(‘a, ‘b.count over ‘x as ‘xcnt, ‘c.count over ‘y as ‘ycnt, ‘x.start, ‘x.end) // no over required if only one RowWindow is defined

Event-time row-windows process rows with increasing timestamps (i.e., rows are sorted on timestamp) because the result of aggregations depends on the order of elements in the window. Hence, only ordered processing will guarantee consistent results.

Similar to the group window definition Window, RowWindow is not an interface that users can implement, Similar to the group window definition Window, RowWindow is not an interface that users can implement, but rather a collection of built-in API objects that are internally translated into efficient DataStream operators. There are many different ways how RowWindows can be defined, e.g., time semantics (processing-time, event-time, row-count), window-length (fixed length, variable length), window-start (aligned by time, on element arrival), window-end (aligned by time, time-out), etc.

The first version of row-window aggregates will not support multiple RowWindow definitions.

Do we want to process event-time row-windows with increasing timestamps? Since the result of aggregations will depend on the order of element in the window, only ordered processing will guarantee consistent results. The alternative to a mandatory logical sort is to delegate the responsibility for sorted input to the user who can add an orderBy() between groupBy() and rowWindow()

The Table API will initially provide a few RowWindow definitions and extend the set on user requests. The following RowWindow definitions will be initially provided.

Open question:

and extend the set on user requests. The following RowWindow definitions will be initially provided.

TumbleRows

A TumbleRows row-window can be defined over a time interval or a row-count and have a fixed length. TumbleRows time windows start at fixed time offsets, row count windows start with the first element that arrives. Hence they are very similar to tumbling group-windows with respect to their start and length. What differs is the way that the aggregation function is computed. Aggregates are incrementally computed for each row that arrives and include all rows that precede the current row in the TumbleRows window.

  • Tumbling Event-time row-window:
      .rowWindow(TumbleRows over 10.minutes on ‘rowtime as ‘w)

  • Tumbling Processing-time row-window:
      .rowWindow(TumbleRows over 10.minutes on ‘systemtime as ‘w)

  • Tumbling Row-count row-window:
     .rowWindow(TumbleRows over 10.rows)

     

 


over

Mandatory.

Defines the length of the window. Can be time interval or row-count. Time windows start at fixed offsets (e.g., every hour), row-count windows start with the first arriving element.

on

Mandatory

Optional. Required for event-time windows.

For streaming tables, on defines the time mode (‘rowtime

and ‘systemtime are

is a logical system

attributes

attribute), for batch tables, the time attribute over which

the query is evaluated.

allowLateness

Only valid for event time windows. Optional.

Defines when a window may be actually closed and its state be discarded

the query is evaluated.

as

Only valid for time windows. Optional.

Mandatory

Assigns an alias for the table that the following select() clause can refer to in order to access window properties such as window start or end time.

 


SlideRows

A SlideRows window is defined over two intervals, a preceding and a succeeding interval. Both intervals can be defined as a time interval or a row-count (both intervals must be of same type). Aggregates are computed for a row (current row) over the valid range of records before and after the current row.

  • Slide Event-time row-window:
      .rowWindow(SlideRows preceding 5.minutes following 5.minutes on ‘rowtime as ‘w)

  • Slide Processing-time row-window:
      .rowWindow(SlideRows preceding 5.minutes following 5.minutes on ‘systemtime as ‘w)

  • Slide Row-count row-window:
      .rowWindow(SlideRows preceding 5.rows following 5.rows)

  • Slide unbounded-preceding row-window:
     .rowWindow(SlideRows.unboundedPreceding)
     .rowWindow(SlideRows.unboundedPreceding following 5.rows)

 


preceding,

unboundedPreceding

Optional. If not set following() must be set.

Defines the interval of valid rows preceding the current row. Can be time interval or row-count. unboundedPreceding() defines an unbounded interval from the start of the stream, i.e., aggregates are incrementally computed for each row from the “beginning” of the stream.

following

Optional. If not set preceding() must be set.

Defines the interval of valid rows following the current row. Can be time interval or row count.

on

Mandatory

Optional. Required for event-time windows.

For streaming tables, on defines the time mode (‘rowtime

and ‘systemtime are

is a logical system

attributes

attribute), for batch tables, the time attribute over which the query is

evaluated.

allowLateness

Only valid for event time windows. Optional.

Defines when a window may be actually closed and its state be discarded

evaluated.

as

Only valid for time windows. Optional.

Mandatory

Assigns an alias for the table that the following select() clause can refer to in order to access window properties such as window start or end time.

 


SessionRows 

A SessionRows window is started with the first element that arrives (per key or global) and ends when no element (for a key or global) arrives for a certain amount of time. Aggregation functions are incrementally computed for each row that arrives and include all rows that precede the current row in the SessionRows window. After the window has been closed, the current aggregation state is discarded. 

  • Session Event-time row-window:
      .rowWindow(SessionRows withGap 10.minutes on ‘rowtime as ‘w)

  • Session Processing-time row-window:
      .rowWindow(SessionRows withGap 10.minutes on ‘systemtime as ‘w)

 


withGap

Mandatory.

Defines the time out after which the window is closed and its state discarded.

on

Mandatory

Optional. Required for event-time windows.

For streaming tables, on defines the time mode (‘rowtime

and ‘systemtime are

is a logical system attributes), for batch tables, the time attribute over which the query is evaluated.

allowLateness

Only valid for event time windows. Optional.

Defines when a window may be actually closed and its state be discarded.

as

Optional

Mandatory.

Assigns an alias for the table that the following select() clause can refer to in order to access window properties such as window start or end time.

 


Event-time Timestamps and Watermarks in the Table API

 

Event-time operators require that all events are annotated with a timestamp and that the stream provides watermarks. There are two ways to create a streaming table 1) from an existing DataStream or from a StreamTableSource. In the former case we assume / require that timestamps and watermarks are already assigned to the stream and its records. For the latter case, the StreamTableSource is responsible to assign timestamps and watermarks. Since a StreamTableSource is just an interface with a method that returns a DataStream, each implementation of this interface is responsible to assign timestamps and watermarks in order to make the streaming table processable in event time. Hence, a StreamTableSource should provide a method to allow users to configure a custom timestamp and watermark assigner. At the moment, it is not possible to check if a DataStream provides timestamps and watermarks. A Table API query that includes an event-time operator will fail at runtime execution time if those are not provided (this is the same behavior as for DataStream programs that include event-time operators).

Open Question: Should we extend the StreamTableSource interface (or convert it into an abstract class) to provide methods to assign timestamps and watermarks? If yes, we have another API breaking change. This would also mean that a table source is not immutable anymore.

For streaming tables, we add two keywords a keyword rowtime” and “systemtime” which are is represented as symbols a symbol in Scala to distinguish the processing mode. This means that these keywords the keyword look like a system-provided attributes attribute however they are it is not part of table’s schema and cannot be accessed elsewhere (e.g., in a select or groupBy clause). Therefore, streaming tables are not allowed to have a “rowtime” or “systemtime” column. The benefit of representing these keywords as system columns is that queries with event time operator can be executed on streaming and batch tables if the batch tables feature a long column called rowtime.

...

The Table API Validator must check for correctness of given window parameters. Parameters defining the size of a window must be either of type interval of milliseconds for time intervals or of type long for defining row-counts. For streaming tables, the optional window column must be one of the constants constant rowtime” or “systemtime”. For batch tables, it must be a column which is convertible to a long (e.g. timestamp is also a valid long). The alias of window must be checked for uniqueness. The allowed lateness must be of type interval of milliseconds.

Validation of grouping keys, extraction, and validation of aggregates should happen similarly to the current non-windowed validation. The validator must check for a preceding window/rowWindow clause before applying aggregations on streaming tables. Aggregations without a (row) window are not valid on streams (e.g. “select(‘f.count)” results in an exception when using streaming tables).

...

This FLIP proposes new functionality and operators for the Table API. The behavior of existing operators is not modified. However, it introduces the “rowtimeand “systemtime” keywords keyword which can no longer be used as a column names name in streaming tables. Existing streaming Table API programs that include tables with such columns will fail at compile time. The number of users affected by this change should be very low (if any) since the restricted feature set of the streaming Table API does not encourage its use.

...

  1. Implementation of group-windows on streaming tables. This includes implementing the API of group-windows, the logical validation for group-windows, and the definition of the “rowtimeand “systemtime” keywordskeyword. Group-windows on batch tables won’t be initially supported and will throw an exception.

  2. Implementation of tumbling and sliding group-windows on batch tables.

  3. Implementation of session group-windows on batch tables.

  4. Implementation of SessionRow row-windows for streaming tables. This includes the definition of row-windows and the SessionRow row window type. SessionRow row-windows on batch tables won’t be initially supported and will throw an exception.

  5. Implementation of TumbleRow row-windows for streaming tables. This includes the definition of the TumbleRow row window type. TumbleRow row-windows on batch tables won’t be initially supported and will throw an exception.

  6. Implementation of SlidingRow row-windows for streaming tables. This includes the definition of the SlidingRow window type and its validation. SlidingRow windows on batch tables won’t be initially supported and will cause an exception.

  7. Implementation of SessionRow row-windows for batch tables.

  8. Implementation of TumbleRow row-windows for batch tables.

  9. Implementation of SlideRow row-windows for batch tables.