Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Please note that this JIRA has been abandoned.

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Please note that this JIRA has been abandoned.

Motivation

Although the current Flink SQL Window Aggregation documentation[1] indicates that the legacy Group Window Aggregation syntax has been deprecated, the new Window TVF Aggregation syntax has not fully covered all of the features of the legacy one. Whether to support CDC streaming is an obvious example. The operator generated by the new Window TVF Aggregation syntax only supports consuming append-only streams, while the operator generated by the legacy Group Window Aggregation syntax can support consuming CDC streams.

The purpose of this Flip is to continue the work of Flip-145[2]. It aims to enhance the new Window TVF Aggregation to support CDC streams, thereby aligning its capabilities more closely with those of the old Group Window Aggregation and ultimately preparing for the formal removal of the old syntax in Flink 2.0. Supporting for consuming CDC streams for Window Join, Window TopN, etc will be followed up in separate tasks.

Public Interfaces

There are no changes to the public API.

Before consuming cdc streaming is supported, the following exception will be thrown.

Code Block
org.apache.flink.table.api.TableException: StreamPhysicalWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[...], fields=[...])

	at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:405)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:328)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:366)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:355)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:354)
	...

After this feature, the node StreamPhysicalWindowAggregate generated by the Window TVF Aggregation syntax can consume CDC data like the operator StreamPhysicalGroupWindowAggregate generated by the legacy Group Window Aggregate syntax, and the data output should also be consistent with Group Window Aggregate.

Proposed Changes

The base capability of the WindowAggregate operator to support consuming CDC stream has been implemented(AggCombiner#combine[3]). By modifying the logic about inferring ModifyKind/Update Kind, inferring Upsert Key, code generation, etc, we can ensure that the WindowAggregate operator fulfills this feature. All that remains is to conduct correctness tests for various scenarios. 

Briefly list the parts that need to be modified.

  1. Add "retract" function when generating aggregate handle function.
  2. Add a "count" column if not exist to count the number of data in the window and modify the logic in WindowAggProcessor. If the number of data in the window is 0, no data should be output when the window is fired.
  3. Add processing of StreamPhysicalWindowAggregate class in FlinkRelMdUniqueKeys and FlinkRelMdUpsertKeys.
  4. Allow the StreamPhysicalWindowAggregate node to consume all modify kind inputs in FlinkChangelogModeInferenceProgram.

Compatibility, Deprecation, and Migration Plan

None

Test Plan

All new features introduced will be covered by UTs and ITs.

Rejected Alternatives

None.

Reference

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/

[2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function#FLIP145:SupportSQLwindowingtablevaluedfunction-SessionWindows

...

.

...

[6] https://issues.apache.org/jira/browse/FLINK-25499

=========================================

Legacy Part

Just retained to provide sufficient context.

...



Motivation

Although the current Flink SQL Window Aggregation documentation[1] indicates that the legacy Group Window Aggregation syntax has been deprecated, the new Window TVF Aggregation syntax has not fully covered all of the features of the legacy one.

...

For more descriptions and examples of Session Window TVF syntax, please refer to FLIP-145.

Public Interfaces

After aligning the features about these two syntax Window Aggregation, the legacy syntax Group Window Aggregation will be deprecated.

Please note that only deprecation about the legacy syntax Group Window Aggregation is marked in this flip, and it will not be removed actually.

Proposed Changes

Since no changes are involved in public api, the implementation details will not be deeply discussed here. But the implementation ideas will be briefly described. Each section corresponds to one item that needs to be supplemented or added in Motivation.

...

We will provide an experimental configuration named “table.optimizer.window-rewrite-enabled” to allow users to fall back to the behavior of the previous version. In the long term, the GroupWindowAggregate node will be removed once the WindowAggregate node can align with all the capabilities of the GroupWindowAggregate node. This part of the removal work should be entirely internal to the code, and does not need to be perceived by users.

Compatibility, Deprecation, and Migration Plan

This proposal officially announces the deprecation of the legacy syntax for Group Window Aggregation. It should be noted that the GroupWindowAggregate node will be rewritten as the WindowAggregate node in the SQL plan, which may result in incompatibility. However, this will not affect the correctness of the data.

Test Plan

All new features introduced will be covered by UTs and ITs.

Additionally, the existing UTs and ITs for the GroupWindowAggregate node can also be used to test the operator rewriting.

Rejected Alternatives

None.

Appendix

All supported window types

...


Window TVF Aggregation

Group Window Aggregation

Support Grouping Set

Y

N

Support Local-Globla Agg

Y

N

Support Partial-Final Agg

Y

N

Support Distinct Filter

Y

Y

Support conf about 'early fire', 'late fire' and 'allow lateness'

N

Y

Support HOP window size with non-integer step length

N

Y

Reference

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/

...