You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 9 Next »

Status

Current stateUnder Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The DSL currently supports windowed aggregations for only two types of time-based window: hopping and tumbling. A third kind of window is defined, but only used in join operations: sliding windows. Users needing sliding window semantics can approximate them with hopping windows where the advance time is 1, but this workaround only artificially resembles the sliding window model; aggregates will be output for every defined hopping window, of which there will likely be a large number (specifically, the size of the window in milliseconds). The semantics for out-of-order data also differ, as sliding windows are inclusive at both ends (ie start and end time bounds)

Public Interfaces

Rather than adapt the existing TimeWindows interface (which provides semantics for tumbling and hopping windows), I propose to add a separate SlidingWindows class. This will resemble a stripped-down version of the TimeWindows class, and have only one public API (plus several methods in needs to override from the abstract base class Windows<W>):

public final class SlidingWindows extends Windows<TimeWindow> {    

	public static SlidingWindows of(final Duration size);

	@Override
	public Map<Long, TimeWindow> windowsFor(final long timestamp);

	@Override
    public long gracePeriodMs() { return 0; }

	@Override
    public long size() { return sizeMs;}

	@Override
    public boolean equals(final Object o);
 
	@Override
    public int hashCode();

	@Override
	public String toString();
}

This would effectively be used in the same manner as TimeWindows. For example to do a counting aggregation with sliding windows, you would have something like

final KTable<Windowed<String>, Long> counts = source
            .groupBy((key, value) -> value)
            .windowedBy(SlidingWindows.of(Duration.ofSeconds(5)))
            .count();


Proposed Changes 

---------WIP---------

The semantics of the sliding window based aggregations are as follows:

Only one window is defined for each record being processed, which ends at the current stream time and begins "window size" milliseconds before. The window thus "slides" forward in time, advancing when new data arrives that advances the stream time. Some notes:

  • Both time bounds are inclusive
  • The "current" time is considered the window end time – so window start time < window end time
  • Each record effectively triggers two output; one when it "enters" the window, and one when it "leaves" out the other side
  • Out of order data that still falls within the current window triggers an update for every window that contains it 
  • Out of order data that arrives outside the grace period is dropped



Compatibility, Deprecation, and Migration Plan

N/A

Rejected Alternatives

In general, many alternatives had to be rejected because the current aggregator can only apply a value to an aggregate; it can not be used to combine two aggregates. For example it might seem better to store the actual aggregate of each bucket rather than the "running aggregate" .. unfortunately we would have no way to combine them.


  • No labels