Please note that this FLIP has been abandoned.


Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Motivation

Although the current Flink SQL Window Aggregation documentation[1] indicates that the legacy Group Window Aggregation syntax has been deprecated, the new Window TVF Aggregation syntax has not fully covered all of the features of the legacy one.

For the difference in features between Group Window Aggregation and Window TVF Aggregation, please refer to the appendix chapter. Compared to Group Window Aggregation, Window TVF Aggregation has several advantages, such as two-stage optimization, support for standard GROUPING SET syntax, and so on. However, it needs to supplement and enrich the following features.

  1. Support for SESSION Window TVF Aggregation
  2. Support for consuming CDC stream
  3. Support for HOP window size with non-integer step length

  4. Support for configurations such as early fire, late fire and allow lateness(which are internal experimental configurations in Group Window Aggregation and not public to users yet.)
  5. Unification of the Window TVF Aggregation operator in runtime at the implementation layer(In the long term, the cost to maintain the operators about Window TVF Aggregation and Group Window Aggregation is too expensive.)

This flip aims to continue the unfinished work in FLIP-145[2], which is to fully enable the capabilities of Window TVF Aggregation and officially deprecate the legacy syntax Group Window Aggregation, to prepare for the removal of the legacy one in Flink 2.0. Support for new features such as Window Join and Window TopN involved in Flip-145 will be followed up in separate tasks.

SQL Syntax

Support Session Window TVF

In the SQL API, we only need to add support for the syntax of Session Window TVF. Although it is not yet implemented, FLIP-145[2] has provided the standard syntax about Session Window TVF. The syntax is as follows:

SESSION(data [PARTITION BY (keycols, ...)], DESCRIPTOR(timecol), gap)
  • data is a table parameter that can be any relation with an time attribute column

  • keycols is a column descriptor indicating which columns should be used to partition the data prior to sessionization

  • timecol is a column descriptor indicating which time attribute column of data should be mapped to tumbling windows

  • gap is the maximum difference in timestamp for two events to be considered part of the same sessions

For more descriptions and examples of Session Window TVF syntax, please refer to FLIP-145.

Public Interfaces

After aligning the features about these two syntax Window Aggregation, the legacy syntax Group Window Aggregation will be deprecated.

Please note that only deprecation about the legacy syntax Group Window Aggregation is marked in this flip, and it will not be removed actually.

Proposed Changes

Since no changes are involved in public api, the implementation details will not be deeply discussed here. But the implementation ideas will be briefly described. Each section corresponds to one item that needs to be supplemented or added in Motivation.

1. Support SESSION Window TVF Aggregation

To support this feature, the modify about modules Table Parser, Table Planner and Table Runtime will firstly come to mind.

Table Parser

Based on Calcite-4865[3], we can already use Session Window TVF syntax as described in FLIP-145. However, in Calcite, a Table Function contains PARTITION BY or ORDER BY will be converted into a PTF (Polymorphic Table function). PTF has some key features that need to be set as follows. (copied from Calcite documentation[4])

  1. ROW semantics or SET semantics
    1. Row semantics means that the result of the table function depends on a row-by-row basis.
    2. Set semantics means that the outcome of the function depends on how the data is partitioned.

      2. Whether the table function can generate a result row even if the input table is empty

    1. If the table function can generate a result row on empty input, the table is said to be “keep when empty”.
    2. The alternative is called “prune when empty”, meaning that the result would be pruned out if the input table is empty.

      3. Whether the input table supports pass-through columns or not

    1. Pass-through columns is a mechanism enabling the table function to copy every column of an input row into columns of an output row.

For Session Window TVF in Flink, the following semantics should be set.

  1. SET semantics
  2. Prune when empty
  3. Supports pass-through column

In addition, Calcite need to maintain compatibility with the legacy Session Window TVF syntax as shown bellow. However, Flink has no historic compatibility issues, we can directly disable this following legacy syntax.

SELECT * FROM TABLE(
	SESSION(
    DATA => TABLE my_table,
    TIMECOL => DESCRIPTOR(rowtime),
    KEY => DESCRIPTOR(order_id),
    SIZE => INTERVAL '10' MINUTE
  )
)

Table Planner & Table Runtime

Add handling of the Session Window TVF in related rules during the plan optimization phase.

  • ProjectWindowTableFunctionTransposeRule
  • PullUpWindowTableFunctionIntoWindowAggregateRule
  • ...

2. Support consuming CDC stream

The base capability of the WindowAggregate operator to support consuming CDC stream has been implemented(AggCombiner#combine[5]). By modifying the logic about inferring ModifyKind/Update Kind, inferring Upsert Key, code generation and etc, we can ensure that the WindowAggregate operator fulfills this feature. All that remains is to conduct correctness tests for various scenarios.

3. Support HOP window size with non-integer step length

Similar to the logic in operator GroupWindowAggregate, the implementation of the WindowAggregate operator to support this feature would be handled internally within the Table Runtime module.

4. Support configurations such as early fire, late fire and allow lateness(which are internal experimental configurations in Group Window Aggregation and not public to users yet.)

By modifying part of the code in Table Runtime module, the WindowAggregate operator can implement this feature.

5. Unify into the Window TVF Aggregation operators in runtime at the implementation layer

During the plan optimization phase in the planner, the GroupWindowAggregate node, produced by the legacy syntax, will be unified with the WindowAggregate node, which corresponds to the new syntax, at runtime.

We will provide an experimental configuration named “table.optimizer.window-rewrite-enabled” to allow users to fall back to the behavior of the previous version. In the long term, the GroupWindowAggregate node will be removed once the WindowAggregate node can align with all the capabilities of the GroupWindowAggregate node. This part of the removal work should be entirely internal to the code, and does not need to be perceived by users.

Compatibility, Deprecation, and Migration Plan

This proposal officially announces the deprecation of the legacy syntax for Group Window Aggregation. It should be noted that the GroupWindowAggregate node will be rewritten as the WindowAggregate node in the SQL plan, which may result in incompatibility. However, this will not affect the correctness of the data.

Test Plan

All new features introduced will be covered by UTs and ITs.

Additionally, the existing UTs and ITs for the GroupWindowAggregate node can also be used to test the operator rewriting.

Rejected Alternatives

None.

Appendix

All supported window types


Window TVF Aggregation

Group Window Aggregation

TUMBLE

Y

Y

HOP

Y

Y

SESSION

N

Y

CUMULATE

Y

N

Whether supports consuming CDC stream


Window TVF Aggregation

Group Window Aggregation

TUMBLE

N

Y

HOP

N

Y

SESSION

N/A

Y

CUMULATE

N

N/A

Other features


Window TVF Aggregation

Group Window Aggregation

Support Grouping Set

Y

N

Support Local-Globla Agg

Y

N

Support Partial-Final Agg

Y

N

Support Distinct Filter

Y

Y

Support conf about 'early fire', 'late fire' and 'allow lateness'

N

Y

Support HOP window size with non-integer step length

N

Y

Reference

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/

[2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function#FLIP145:SupportSQLwindowingtablevaluedfunction-SessionWindows

[3] https://issues.apache.org/jira/browse/CALCITE-4865

[4] https://calcite.apache.org/docs/reference.html#table-functions

[5] https://github.com/apache/flink/blob/c61c09e464073fae430cab2dd56bd608f9d275fd/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggCombiner.java#L96

[6] https://issues.apache.org/jira/browse/FLINK-25499