Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The DSL currently supports windowed aggregations for only two types of time-based window: hopping and tumbling. A third kind of window is defined, but only used in join operations: sliding windows. Users needing sliding window semantics can approximate them with hopping windows where the advance time is 1, but this workaround only artificially resembles the sliding window model; aggregates will be output for every defined hopping window, of which there will likely be a large number (specifically, the size of the window in milliseconds). The semantics for out-of-order data also differ, as sliding windows are inclusive at both ends (ie start and end time bounds)
Public Interfaces
Rather than adapt the existing TimeWindows interface (which provides semantics for tumbling and hopping windows), I propose to add a separate SlidingWindows class. This will resemble a stripped-down version of the TimeWindows class, and have only one public API (plus several methods in needs to override from the abstract base class Windows<W>):
public final class SlidingWindows extends Windows<TimeWindow> { public static SlidingWindows of(final Duration size); @Override public Map<Long, TimeWindow> windowsFor(final long timestamp); @Override public long gracePeriodMs() { return 0; } @Override public long size() { return sizeMs;} @Override public boolean equals(final Object o); @Override public int hashCode(); @Override public String toString(); }
This would effectively be used in the same manner as TimeWindows. For example, to do a counting aggregation with sliding windows, you would have something like
final KTable<Windowed<String>, Long> counts = source .groupBy((key, value) -> value) .windowedBy(SlidingWindows.of(Duration.ofSeconds(5))) .count();
Proposed Changes
The semantics of the sliding window based aggregations are as follows:
- Only one window is defined for each distinct set of records falling within a single window.
- Since the windows represent records that fall within "windowSize" of each other, both time bounds are inclusive
- The window is defined "forwards" in time, such that window start time < window end time
- A record is output for a window when it can no longer be updated, ie when its endTime < streamTime - gracePeriod
- Each distinct timestamp seen in input effectively triggers one output. In other words, each new record results in exactly one output UNLESS we have already seen a record with the same timestamp (in which case there would be no new distinct set of records, and thus no new window)
- Out of order data that still falls within the grace period will be kept (but will never trigger any output as it does not advance streamtime)
- Out of order data that arrives outside the grace period is dropped
* Note that in processing a new record, an arbitrary number of outputs may be triggered by stream time advancing and any number of older windows leaving the grace period. But semantically, 1 (distinct) record ↔ 1 output
We can leverage on the existing window stores with a slightly different processing path. As with other types of windowed aggregations, the underlying store will be used to hold the aggregate of each defined window. Rather than putting a new window into the store every X ms (as is done for TimeWindows), a new window will be inserted into the store only upon seeing a record
Compatibility, Deprecation, and Migration Plan
N/A
Rejected Alternatives
In general, many alternatives had to be rejected because the current aggregator can only apply a value to an aggregate; it can not be used to combine two aggregates. For example it might seem better to store the actual aggregate of each bucket rather than the "running aggregate" .. unfortunately we would have no way to combine them.