You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

This proposal is an alternative solution to KIP-450, which instead of proposing a new sliding window construct, we would just try to optimize the tumbling window where window length is >> advance step.


Motivation

With a tumbling window length of N, and an advance step of M << N, each window aggregation update involves the following: for each window of the M / N windows this record falls into, issue a update (get, aggregate, and then put).

The total cost of it:


 Read:   M / N
Write:   M / N


Instead, we can aggregate per overlapping advance step (let's call it sub-window), and then return the aggregated value across all the overlapping period that this window covers. More specifically, we would only do an update on one sub-window, and then we would return the value by further aggregating the values of all sub-windows. This is a very common window aggregation techniques (see references below). The update is a single read plus a single write, but the further aggregation involves a total of M / N windows, and we need to read the neighboring 2 * (M / N - 1) reads:


The total cost of it:

 Read:   2 * (M / N - 1)  + 1 = 2 * M / N - 1
Write:   1


We assume the major cost is on state store IO, and the further aggregation CPU is neglectable. 


So it seems we are trading more reads for less writes, which is counter intuitive for a write-optimized state store like RocksDB. But there are a few things we should keep in mind:

  1. We can further optimize our implementation by buffering the partial sub-window aggregations to reduce repeating fetches on the latest sub-windows to reduce reads from the underlying state store: this is similar to the window-slicing / pre-aggregation techniques.
  2. If the distribution of records falling into the sub-windows is sparse (i.e. a given window would only have records in a very of its sub-windows), then the underlying store's get calls could be more efficient to return empty results (e.g. RocksDB's bloom-filter).


Proposal

We can refactor the implementation of WindowAggregateProcessor / SessionAggregateProcessor such that aggregate values are organized per sub-window, and then we apply aggregations on-the-fly for each process function call across all the sub-windows of a given window. But the aggregation of sub-windows cannot rely on the existing user-provided "Aggregator" API since the aggregate result type and the aggregating value type may not be the same.


API Changes

As pointed out above, we need the further "combine" function in addition to the existing aggregate result

public interface CombineAggregator<K, V, VA> extends Aggregator<K, V, VA> {

        VA combine(final VA agg1, final VA agg2);
}


References:

Unable to render Jira issues macro, execution error.

https://openproceedings.org/2019/conf/edbt/EDBT19_paper_171.pdf

http://www.vldb.org/pvldb/vol12/p1167-tangwongsan.pdf

  • No labels