Status
Current state: "Under Discussion"
Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: 1.2
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The Table API is a declarative API to define queries on static and streaming tables. So far, only projection, selection, and union are supported operations on streaming tables. This FLIP proposes to add support for different types of aggregations on top of streaming tables. In particular, we seek to support:
Group-window aggregates, i.e., aggregates which are computed for a group of elements. A (time or row-count) window is required to bound the infinite input stream into a finite group.
Row-window aggregates, i.e., aggregates which are computed for each row, based on a window (range) of preceding and succeeding rows.
Each type of aggregate shall be supported on keyed/grouped or non-keyed/grouped data streams for streaming tables as well as batch tables.
Since time-windowed aggregates will be the first operation that require the definition of time, this FLIP does also discuss how the Table API handles time characteristics, timestamps, and watermarks.
Public Interfaces
The feature will extend the Table API. We propose the following methods:
Group-Window Aggregates
Group-windows are evaluated once per group and defined using the window(w: Window)
method. The Window
parameter defines the type and parameters of the window to compute. The window()
method can be applied on a Table
(yielding a non-keyed DataStream.windowAll
) or on a GroupedTable
(resulting in a keyed window).
val res = tab
.groupBy(‘a) // leave groupBy-Clause out to define non-keyed / global windows
.window(w: Window)
.select(‘a, ‘b.sum, ‘w.start, ‘w.end)
Window
defines how rows are mapped to windows. Window
is not a custom function that users can extend, but only an API object that is internally handled and translated into a corresponding DataStream
operation. The initially supported window definitions are listed below. More window definitions, e.g., in order to incorporate the features of the Trigger DSL (FLIP-9), might be added later.Tumble (Tumbling Windows)
Tumbling Event-time window:
.window(Tumble over 10.minutes on ‘rowtime as ‘w)
Tumbling Processing-time window:
.window(Tumble over 10.minutes on ‘systemtime as ‘w)
Tumbling Row-count window:
.window(Tumble over 10.rows)
| mandatory | Defines the length the window, either by time or row count. |
| Mandatory for time windows | For streaming tables, |
| Only valid for event time windows, optional | Defines when a window may be evaluated. |
| Only valid for time windows, optional | Assigns an alias for the window that the following |
Proposed Changes
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Test Plan
Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.