Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This proposal is an alternative solution to KIP-450, which . The argument is that instead of proposing a new sliding window construct, we would just try to optimize the tumbling window where window length is >> advance step.

...

Code Block
public interface CombineAggregator<KCombinator<K, V, VA> extends Aggregator<K, V, VA> {

        VA combine(final VA agg1, final VA agg2);
}

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Combinator<? super K, VR, VR> combinator);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Combinator<? super K, VR, VR> combinator,
                             final Combinator<? super K, VR, VR> inverter);


Note that with the overloaded `aggregate` in addition to the original one that only takes the `initializer` and `aggregator`, we can let users to indicate the library whether we should to optimized window-slicing based approach v.s. the current "eager-aggregate" approach.

Also we allow two combinators, one that "combining" the second agg2 with the first partial aggregate agg1, while the other "inverting" the second agg2 from the partial aggregate agg1. This is enabling the library to further explore the invertibility in addition to associativity / commutativity of the aggregation function (more details in http://www.vldb.org/pvldb/vol8/p702-tangwongsan.pdf).


Implementation

Note this would likely have an impact on how we do suppression today: today the aggregation and suppression are totally independent operators, but with this optimization we may consider the suppression effects while implementing the aggregation operators to reduce unnecessary reads on the sub-windows.

The general algorithm that based on balanced trees (see references) are quite general, whereas in practice we can assume the out-of-ordering data does not have a large distance to the latest window boundaries. Thus, we can actually simply the academic algorithm such that (the following is just a wild thought, open for discussion):

  • If invertibility is enabled, cache the latest final aggregates from sub-windows and invert the partial aggregate from the earliest sub-window to expire when necessary; i.e. it is a simple two-layered tree.
  • If invertibility is not enabled, cache the latest final aggregate minus each sub-window that it includes, i.e. if the window has N sub-windows, we keep one partial aggregate value over N-1 sub-windows and one partial aggregate value over each individual sub-window, forming a three-layered tree.

References:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-7001

https://openproceedings.org/2019/conf/edbt/EDBT19_paper_171.pdf

http://www.vldb.org/pvldb/vol8/p702-tangwongsan.pdf (slides: http://hirzels.com/martin/papers/vldb15-chprop-agg-talk.pdf)

http://www.vldb.org/pvldb/vol12/p1167-tangwongsan.pdf