Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, Kafka Streams implements session windows, tumbling windows, and hopping windows as windowed aggregation methods. While hopping windows with a small advance time are similar to a sliding window, this implementation's performance is poor because it results in many overlapping and often redundant windows that require expensive calculations. As sliding windows do calculations for each distinct window, the sliding window implementation would be a more efficient way to do these types of aggregations. Additionally, sliding windows are inclusive on both the start and end time, creating a different set of windows than hopping windows, which are only inclusive on the start time.  

This KIP proposes adding sliding windows to give users an efficient way to do sliding aggregation. 

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

Add kafka.streams.kstream.SlidingWindows and # APIs

Code Block
languagejava
firstline1
public final class SlidingWindows extends Windows<TimeWindow> {
    /** The size of the windows in milliseconds. */
    public final long sizeMs;

    /** The grace period in milliseconds. */
    private final long graceMs;

	public static SlidingWindows of(final Duration size) throws IllegalArgumentException{

		//allows the user to set the/** Base constructor, accepts both size and grace variables to create window */
	private SlidingWindows(final long sizeMs, final long graceMs){
		this.sizeMs = sizeMs;
		this.graceMs = graceMs;

	}

	/**Constructor just with size, sets grace to default */

	private SlidingWindows(final long sizeMs){
		this.sizeMs = sizeMs;
		this.graceMs = -1;

	}

	/**
     * Return a SlidingWindow of size given by user in milliseconds
     *
     * @param size The size of the window, > 0
     * @return a new window in milliseconds, must be larger than 0
    }
 definition with default maintain duration of 1 day, size as given
     * @throws IllegalArgumentException if the window size is negative or zero
     */
	public static SlidingWindows graceof(final Duration afterWindowEndsize) throws IllegalArgumentException{}

		//allows the user to set the "grace period" or retention time for the windows, cannot be negative

	}


}

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

 /**
     * Reject out-of-order events that arrive more than {@code afterWindowEnd}
     * after the end of its window.
     *
     * @param afterWindowEnd The grace period to admit out-of-order events to a window.
     * @return updated builder window with same size and new grace
     * @throws IllegalArgumentException if {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
     */
	public SlidingWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException{}
}


Proposed Changes

Creating a new class of Windows, SlidingWindows, to add aggregation flexibility and features for users. Building off the TimeWindow implementation, these sliding windows will have an inclusive start and end point and each window will be unique, meaning that each distinct set of records will only appear in one window. This is in contrast with hopping windows, which can mimic sliding windows with an advance of 1ms, but result in many overlapping windows, as shown below. This causes expensive aggregation calculations for each window, whereas for the sliding windows, aggregations are only done for each unique window, cutting down significantly on the amount of work required.

Windows with SlidingWindow Implementation: Window size 10ms, 9 windows for 5 records.





Image Added





Image Added




Windows with HoppingWindow with a 1ms advance: Window size 10ms, 17 windows for 5 records.

Image Added


With Sliding Windows, a new window is defined when a new record enters on the right, or one after a record leaves the window on the left. In this example, a new window is formed with C as the end point when it enters the window area, creating a window with the unique set of {A, B, C}. Then, as the window slides along, a new window is defined 1ms after a record, A, leaves the window, creating a new window with the unique set of {B, C}. The next window to be created in this example would be 1ms after B leaves the window, creating a window with just {C}.

Image AddedAlter KStreamWindowAggregate to properly handle SlidingWindows.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

Implementing a new feature, should be compatible with all existing features.

Rejected Alternatives

Other Types of Sliding Windows:

There are semantic differences between some implementations of Sliding Windows. We chose to have both ends of the window be inclusive, and to define windows based on a unique set of events. There are alternative implementations where either or both ends are not inclusive. Additionally, there are different ways to decide the start and end points of a window. We chose to define a new window based on a new record entering on the right, or one after a record leaves the window on the left. We chose this because it is intuitive with the window "sliding." An alternative implementation could define a window based on a new record entering on the left and 1ms before a record leaves on the right.

Other Implementation Options:

Instead of implementing a new SlidingWindows class that extends TimeWindows, it may have been possible to make changes to TimeWindows, which currently implements hopping and tumbling windows, to including a sliding windows option. In order to better distinguish between sliding and hopping windows, and for user ease and simplicity, this option was not chosen. If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.