Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Rather than adapt the existing TimeWindows interface (which provides semantics for tumbling and hopping windows), I propose to add a separate SlidingWindows class. This will resemble a stripped-down version of the TimeWindows class, and have only one public methodAPI (plus several methods in needs to override from the abstract base class Windows<W>):

Code Block
languagejava
public final class SlidingWindows extends Windows<TimeWindow> {    

	public static SlidingWindows of(final Duration size);

	@Override
	public Map<Long, TimeWindow> windowsFor(final long timestamp);

	@Override
    public long gracePeriodMs() { return 0; }

	@Override
    public long size() { return sizeMs;}

	@Override
    public boolean equals(final Object o);
 
	@Override
    public int hashCode();

	@Override
	public String toString();
}

This would effectively be used in the same manner as TimeWindows. For example to do a counting aggregation with sliding windows, you would have something like

...

  • Both time bounds are inclusive
  • At most one record is forwarded when new data arrives
  • Out of order data that still falls within the current window is simply added to the running aggregate
  • Out of order data that arrives outside the current window is dropped
  • The window size is effectively the grace and retention periodretention period for the records, and the grace period is zero


Simple Design

As an initial, POC implementation we can process sliding window aggregates by storing, for each record, the current aggregate of a window that begins at its timestamp. When a new record arrives we can then compute the new total aggregate for the window by aggregating its value with the first aggregate value defined in the window. We would then have to traverse the rest of the window to update the aggregate for each window that starts at/before the new record's timestamp.

...