Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public interface CombineAggregator<K, V, VA> extends Aggregator<KCombinator<K, V, VA> {

        VA combine(final VA agg1, final VA agg2);
}

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Combinator<? super K, VR, VR> combinator);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Combinator<? super K, VR, VR> combinator,
                             final Combinator<? super K, VR, VR> inverter);


Note that with the overloaded `aggregate` in addition to the original one that only takes the `initializer` and `aggregator`, we can let users to indicate the library whether we should to optimized window-slicing based approach v.s. the current "eager-aggregate" approach.

Also we allow two combinators, one that "combining" the second agg2 with the first partial aggregate agg1, while the other "inverting" the second agg2 from the partial aggregate agg1. This is enabling the library to further explore the invertibility in addition to associativity / commutativity of the aggregation function (more details in http://www.vldb.org/pvldb/vol8/p702-tangwongsan.pdf).


References:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-7001

https://openproceedings.org/2019/conf/edbt/EDBT19_paper_171.pdf

http://www.vldb.org/pvldb/vol8/p702-tangwongsan.pdf

http://www.vldb.org/pvldb/vol12/p1167-tangwongsan.pdf