Please note that this FLIP has been abandoned.
Page properties | ||||||||
---|---|---|---|---|---|---|---|---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Although the current Flink SQL Window Aggregation documentationdocumentation[1] indicates that the legacy Group Window Aggregation syntax has been deprecated, the new Window TVF Aggregation syntax has not fully covered all of the features of the legacy one.
For the difference in features between Group Window Aggregation and Window TVF Aggregation, please refer to the appendix chapter. Compared to Group Window AggergationAggregation, Window TVF Aggergation Aggregation has several advantages, such as two-stage optimization, support for standard GROUPING SET
syntax, and so on. However, it needs to supplement and enrich the following features.
...
This flip aims to continue the unfinished work in FLIP-145[2], which is to fully enable the capabilities of Window TVF Aggregation and officially deprecate the legacy syntax Group Window Aggregation, to prepare for the removal of the legacy one in Flink 2.0. Support for new features such as Window Join and Window TopN involved in Flip-145 will be followed up in separate tasks.
SQL Syntax
Support Session Window TVF
In the SQL API, we only need to add support for the syntax of Session Window TVF. Although it is not yet implemented, FLIP-145[2] has provided the standard syntax about Session Window TVF. The syntax is as follows:
...
For more descriptions and examples of Session Window TVF syntax, please refer to FLIP-145.
Public Interfaces
After aligning the features about these two syntax Window Aggregation, the legacy syntax Group Window Aggregation will be deprecated.
Please note that only deprecation about the legacy syntax Group Window Aggregation is marked in this flip, and it will not be removed actually.
Proposed Changes
Since no changes are involved in public api, the implementation details will not be deeply discussed here. But the implementation ideas will be briefly described. Each section corresponds to one item that needs to be supplemented or added in Motivation.
1. Support SESSION Window TVF Aggregation
To support this feature, the modify about modules Table Parser
, Table Planner
and Table Runtime
will firstly come to mind.
Table Parser
Based on Calcite-4865[3], we can already use Session Window TVF syntax as described in FLIP-145. However, in Calcite, a Table Function contains PARTITION BY
or ORDER BY
will be converted into a PTF (Polymorphic Table function). PTF has some key features that need to be set as follows. (copied from Calcite documentation[4])
- ROW semantics or SET semantics
- Row semantics means that the result of the table function depends on a row-by-row basis.
- Set semantics means that the outcome of the function depends on how the data is partitioned.
2. Whether the table function can generate a result row even if the input table is empty
- If the table function can generate a result row on empty input, the table is said to be “keep when empty”.
- The alternative is called “prune when empty”, meaning that the result would be pruned out if the input table is empty.
3. Whether the input table supports pass-through columns or not
- Pass-through columns is a mechanism enabling the table function to copy every column of an input row into columns of an output row.
...
Code Block |
---|
SELECT * FROM TABLE( SESSION( DATA => TABLE my_table, TIMECOL => DESCRIPTOR(rowtime), KEY => DESCRIPTOR(order_id), SIZE => INTERVAL '10' MINUTE ) ) |
Table Planner & Table Runtime
Add handling of the Session Window TVF in related rules during the plan optimization phase.
- ProjectWindowTableFunctionTransposeRule
- PullUpWindowTableFunctionIntoWindowAggregateRule
- ...
2. Support consuming CDC stream
The base capability of the WindowAggregate operator to support consuming CDC stream has been implemented(AggCombiner#combine[5]). By modifying the logic about inferring ModifyKind/Update Kind, inferring Upsert Key, code generation and etc, we can ensure that the WindowAggregate operator fulfills this feature. All that remains is to conduct correctness tests for various scenarios.
3. Support HOP window size with non-integer step length
Similar to the logic in operator GroupWindowAggregate, the implementation of the WindowAggregate operator to support this feature would be handled internally within the Table Runtime
module.
4. Support configurations such as early fire
, late fire
and allow lateness
(which are internal experimental configurations in Group Window Aggregation and not public to users yet.)
By modifying part of the code in Table Runtime
module, the WindowAggregate operator can implement this feature.
5. Unify into the Window TVF Aggregation operators in runtime at the implementation layer
During the plan optimization phase in the planner, the GroupWindowAggregate node, produced by the legacy syntax, will be unified with the WindowAggregate node, which corresponds to the new syntax, at runtime.
We will provide a configuration an experimental configuration named “table.optimizer.window-rewrite-enabled” to allow users to fall back to the behavior of the previous version. In the long term, the GroupWindowAggregate node will be removed once the WindowAggregate node can align with all the capabilities of the GroupWindowAggregate node. This part of the removal work should be entirely internal to the code, and does not need to be perceived by users.
Compatibility, Deprecation, and Migration Plan
This proposal officially announces the deprecation of the legacy syntax for Group Window Aggregation. It should be noted that the GroupWindowAggregate node will be rewritten as the WindowAggregate node in the SQL plan, which may result in incompatibility. However, this will not affect the correctness of the data.
Test Plan
All new features introduced will be covered by UTs and ITs.
Additionally, the existing UTs and ITs for the GroupWindowAggregate node can also be used to test the operator rewriting.
Rejected Alternatives
None.
Appendix
All supported window types
Window TVF Aggregation | Group Window Aggregation | |
TUMBLE | Y | Y |
HOP | Y | Y |
SESSION | N | Y |
CUMULATE | Y | N |
Whether supports consuming CDC stream
Window TVF Aggregation | Group Window Aggregation | |
TUMBLE | N | Y |
HOP | N | Y |
SESSION | N/A | Y |
CUMULATE | N | N/A |
Other features
Window TVF Aggregation | Group Window Aggregation | |
Support Grouping Set | Y | N |
Support Local-Globla Agg | Y | N |
Support Partial-Final Agg | Y | N |
Support Distinct Filter | Y | Y |
Support conf about 'early fire', 'late fire' and 'allow lateness' | N | Y |
Support HOP window size with non-integer step length | N | Y |
Reference
[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/
...
[6] https://issues.apache.org/jira/browse/FLINK-25499