You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 24 Next »


Status

Current stateUnder Discussion

Discussion thread: here 

JIRA: KAFKA-5636

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, Kafka Streams implements session windows, tumbling windows, and hopping windows as windowed aggregation methods. While hopping windows with a small advance time are similar to a sliding window, this implementation's performance is poor because it results in many overlapping and often redundant windows that require expensive calculations. As sliding windows do calculations for each distinct window, the sliding window implementation would be a more efficient way to do these types of aggregations. Additionally, sliding windows are inclusive on both the start and end time, creating a different set of windows than hopping windows, which are only inclusive on the start time.

This KIP proposes adding sliding windows to give users an efficient way to do sliding aggregation. 

Public Interfaces

Add kafka.streams.kstream.SlidingWindows and APIs

public final class SlidingWindows extends Windows<TimeWindow> {
    /** The size of the windows in milliseconds. */
    public final long sizeMs;

    /** The grace period in milliseconds. */
    private final long graceMs;

	/** Base constructor, accepts both size and grace variables to create window */
	private SlidingWindows(final long sizeMs, final long graceMs){
		this.sizeMs = sizeMs;
		this.graceMs = graceMs;

	}

	/**Constructor just with size, sets grace to default */

	private SlidingWindows(final long sizeMs){
		this.sizeMs = sizeMs;
		this.graceMs = -1;

	}

	/**
     * Return a SlidingWindow of size given by user in milliseconds
     *
     * @param size The size of the window, > 0
     * @return a new window definition with default maintain duration of 1 day, size as given
     * @throws IllegalArgumentException if the window size is negative or zero
     */
	public static SlidingWindows of(final Duration size) throws IllegalArgumentException{}

	 /**
     * Reject out-of-order events that arrive more than {@code afterWindowEnd}
     * after the end of its window.
     *
     * @param afterWindowEnd The grace period to admit out-of-order events to a window.
     * @return updated builder window with same size and new grace
     * @throws IllegalArgumentException if {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
     */
	public SlidingWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException{}
}


Proposed Changes

Creating a new class of Windows, SlidingWindows, to add aggregation flexibility and features for users. Building off the TimeWindow implementation, these sliding windows will have an inclusive start and end point and each window will be unique, meaning that each distinct set of records will only appear in one window. This is in contrast with hopping windows, which can mimic sliding windows with an advance of 1ms, but result in many overlapping windows, as shown below. This causes expensive aggregation calculations for each window, whereas for the sliding windows, aggregations are only done for each unique window, cutting down significantly on the amount of work required.

Figure 1: Windows with SlidingWindow Implementation: Window size 10ms, 9 windows for 5 records.







Figure 2: Windows with HoppingWindow with a 1ms advance: Window size 10ms, 17 windows for 5 records.


With Sliding Windows, a new window is defined when a new record enters on the right, or one after a record leaves the window on the left. In this example, a new window is formed with C as the end point when it enters the window area, creating a window with the unique set {A, B, C}. Then, as the window slides along, a new window is defined 1ms after a record, A, leaves the window, creating a new window with the unique set of {B, C}. The next window to be created in this example would be 1ms after B leaves the window, creating a window with just {C}. In this implementation, there are two ways to define a distinct window using the sliding window semantics, one when a record enters the window from the right (Figure 3), and one when a record leaves the window on the left (Figure 4). When a record leaves the window, the next window is 1ms higher than that record's timestamp. 



Figure 3: New window defined after C enters


Figure 4: New window defined after A exits



Compatibility, Deprecation, and Migration Plan

Implementing a new feature, should be compatible with all existing features.


Rejected Alternatives

Other Types of Sliding Windows:

There are semantic differences between some implementations of Sliding Windows. We chose to have both ends of the window be inclusive, and to define windows based on a unique set of events. There are alternative implementations where either or both ends are not inclusive. Additionally, there are different ways to decide the start and end points of a window. We chose to define a new window based on a new record entering on the right, or one after a record leaves the window on the left. We chose this because it is intuitive with the window "sliding." An alternative implementation could define a window based on a new record entering on the left and 1ms before a record leaves on the right.

Other Implementation Options:

Instead of implementing a new SlidingWindows class that extends TimeWindows, it may have been possible to make changes to TimeWindows, which currently implements hopping and tumbling windows, to including a sliding windows option. In order to better distinguish between sliding and hopping windows, and for user ease and simplicity, this option was not chosen. 


















  • No labels