Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This proposal is an alternative solution to KIP-450, which . The argument is that instead of proposing a new sliding window construct, we would just try to optimize the tumbling window where window length is >> advance step.

...

With a tumbling window length of NM, and an advance step of N << M << N, each window aggregation update involves the following: for each window of the M / N windows this record falls into, issue a we would update (get, aggregate, and then put) each of the window this record falls into; and on average this record would fall into a total of M / N windows.

The total cost of it:


Code Block
 Read:   M / N
Write:   M / N

...

Instead, we can aggregate per overlapping advance step (let's call it sub-window), and then return the aggregated value across all the overlapping period that this window covers. More specifically, we would only do an update on one sub-window, and then we would return the value by further aggregating the values of all sub-windows. This is a very common window aggregation techniques (see references below). The update is a single read plus a single write, but the further aggregation involves since a total of M / N windows would be updated, and we need to read the neighboring all the relevant sub-windows plus the sub-window that gets updated in order to emit the updated results.


For the total of M / N overlapping windows to be emitted, we would need to access neighboring 2 * (M / N - 1) reads:The plus the one sub-window that gets updated, so the total cost of it:

Code Block
 Read:   2 * (M / N - 1)  + 1 = 2 * M / N - 1
Write:   1

...

So it seems we are trading more reads for less writes, which is counter intuitive for a write-optimized state store like RocksDB. But there are a few things we should keep in mind:

  1. We do not necessarily need to emit the result on each update when suppression is enabled; when we suppress the emission, we only pay one write and one read. As long as we can suppress more than one emission that requires reading M / N sub-windows, this approach would be preferred.
  2. We can further optimize our implementation by buffering the partial sub-window aggregations to reduce repeating fetches on the latest sub-windows to reduce reads from the underlying state store: this is similar to the window-slicing / pre-aggregation techniques.
  3. If the distribution of records falling into the sub-windows is sparse (i.e. a given window would only have records in a very small number of its sub-windows), then the underlying store's get calls could be more efficient to return empty results (e.g. RocksDB's bloom-filter).

...

Code Block
public interface CombineAggregator<K, V, VA> extends Aggregator<K, Combinator<K, V, VA> {

        VA combine(final VA agg1, final VA agg2);
}

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Combinator<? super K, VR, VR> combinator);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Combinator<? super K, VR, VR> combinator,
                             final Combinator<? super K, VR, VR> inverter);


Note that with the overloaded `aggregate` in addition to the original one that only takes the `initializer` and `aggregator`, we can let users to indicate the library whether we should to optimized window-slicing based approach v.s. the current "eager-aggregate" approach.

Also we allow two combinators, one that "combining" the second agg2 with the first partial aggregate agg1, while the other "inverting" the second agg2 from the partial aggregate agg1. This is enabling the library to further explore the invertibility in addition to associativity / commutativity of the aggregation function (more details in http://www.vldb.org/pvldb/vol8/p702-tangwongsan.pdf).


Implementation

Note this would likely have an impact on how we do suppression today: today the aggregation and suppression are totally independent operators, but with this optimization we may consider the suppression effects while implementing the aggregation operators to reduce unnecessary reads on the sub-windows.

The general algorithm that based on balanced trees (see references) are quite general, whereas in practice we can assume the out-of-ordering data does not have a large distance to the latest window boundaries. Thus, we can actually simply the academic algorithm such that (the following is just a wild thought, open for discussion):

  • If invertibility is enabled, cache the latest final aggregates from sub-windows and invert the partial aggregate from the earliest sub-window to expire when necessary; i.e. it is a simple two-layered tree.
  • If invertibility is not enabled, cache the latest final aggregate minus each sub-window that it includes, i.e. if the window has N sub-windows, we keep one partial aggregate value over N-1 sub-windows and one partial aggregate value over each individual sub-window, forming a three-layered tree.

References:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-7001

https://openproceedings.org/2019/conf/edbt/EDBT19_paper_171.pdf

http://www.vldb.org/pvldb/vol8/p702-tangwongsan.pdf (slides: http://hirzels.com/martin/papers/vldb15-chprop-agg-talk.pdf)

http://www.vldb.org/pvldb/vol12/p1167-tangwongsan.pdf