Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Both time bounds are inclusive
  • At most one record is forwarded when new data arrives
  • Out of order data that still falls within the current window is simply added to the running aggregate
  • Out of order data that arrives outside the current window is dropped
  • The window size is effectively the grace and retention period


Initial Simple Design

As an initial, POC implementation we can process sliding window aggregates by storing, for each record, the current aggregate of a window that begins at its timestamp. When a new record arrives we can then compute the new total aggregate for the window by aggregating its value with the first aggregate value defined in the window. We would then have to traverse the rest of the window to update the aggregate for each window that starts at/before the new record's timestamp.

Pros: The main advantage of this is that it requires minimal storage. It would also require minimal code changes, as it is only needs to store (key, Agg) pairs and is therefore compatible with the current KStreamWindowAggregateProcessor.

This is not particularly efficient but I believe it is a good starting point to be improved upon later. "Minimal code changes" are not actually the goal here, rather the hope is to focus on establishing the semantics and behavior as well as the API before moving on to a more complicated design.

Cons: Not particularly efficient: for N records in the window it requires O(n) aggregations as well as O(n) writes to the underlying store.

POC PR can be found here

Improved O(sqrt(N)) Design

As mentioned above, the initial design is not ideal. The two issues with the simple implementation are that it takes linear time, and requires linear writes to the underlying store. One possibility to improve on this design is as follows:

Break up the window into separate buckets, with each bucket storing its collection of values as well as the "running aggregate". By "running aggregate" I mean not the aggregate of the bucket alone, but the total aggregate from the end of that bucket to the end of the window. When a new record arrives, the aggregate is computed by aggregating: the new value, all the values in the first bucket, and the running aggregate of the first bucket. The running aggregates of the other buckets are updated as well (if applicable)

The values would be stored in the usual window store (which can be either persistent or in-memory), and the "running aggregates" could also be either persistent or in-memory. If an in-memory data structure is used, upon restore the values can be used to recompute the aggregates. If persistent, the aggregates could be held either in a separate RocksDB instance or in the same persistent window store as the values by using some special prefix to distinguish actual values from running aggregates.

Pros: If we have N records and M buckets, the time complexity of each incoming record is (N/M) + M . There and there are at most M writes to the underling store. Some further analysis is required to determine the optimal value of M, but this definitely improves on the naive design. There is some tradeoff between purely optimizing 

Cons: Additional spatial complexity (N + M storage is needed). If the running aggregates are held in-memory, restore time is increased; if held in the same RocksDB instance as the window store we increase the size and writes to that store with performance implications; if held in a separate RocksDB instance there is additional memory overhead (relative to reusing the window store's RocksDB)

Compatibility, Deprecation, and Migration Plan

...