You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

 

Status

Current state"Under Discussion"

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: 1.2

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Table API is a declarative API to define queries on static and streaming tables. So far, only projection, selection, and union are supported operations on streaming tables. This FLIP proposes to add support for different types of aggregations on top of streaming tables. In particular, we seek to support:

  • Group-window aggregates, i.e., aggregates which are computed for a group of elements. A (time or row-count) window is required to bound the infinite input stream into a finite group.

  • Row-window aggregates, i.e., aggregates which are computed for each row, based on a window (range) of preceding and succeeding rows.

Each type of aggregate shall be supported on keyed/grouped or non-keyed/grouped data streams for streaming tables as well as batch tables.

Since time-windowed aggregates will be the first operation that require the definition of time, this FLIP does also discuss how the Table API handles time characteristics, timestamps, and watermarks.


Public Interfaces

The feature will extend the Table API. We propose the following methods:

Group-Window Aggregates

Group-windows are evaluated once per group and defined using the window(w: Window) method. The Window parameter defines the type and parameters of the window to compute. The window() method can be applied on a Table (yielding a non-keyed DataStream.windowAll) or on a GroupedTable (resulting in a keyed window).

val res = tab
 .groupBy(‘a) // leave groupBy-Clause out to define non-keyed / global windows
 .window(w: Window)
 .select(‘a, ‘b.sum, ‘w.start, ‘w.end)

 

The Window defines how rows are mapped to windows. Window is not a custom function that users can extend, but only an API object that is internally handled and translated into a corresponding DataStream operation. The initially supported window definitions are listed below. More window definitions, e.g., in order to incorporate the features of the Trigger DSL (FLIP-9), might be added later.

Tumble (Tumbling Windows)

 

  • Tumbling Event-time window:
      .window(Tumble over 10.minutes on ‘rowtime as ‘w)

  • Tumbling Processing-time window:
      .window(Tumble over 10.minutes on ‘systemtime as ‘w)

  • Tumbling Row-count window: 
     .window(Tumble over 10.rows)

 

over

mandatory

Defines the length the window, either by time or row count.

on

Mandatory for time windows

For streaming tables, on defines the time mode (‘rowtime and ‘systemtime are logical system attributes), for batch tables, the time attribute over which the query is evaluated.

allowLateness

Only valid for event time windows, optional

Defines when a window may be evaluated.

as

Only valid for time windows, optional

Assigns an alias for the window that the following select() clause can refer to in order to access window properties such as window start or end time.

 

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels