You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 31 Next »


Status

Current stateUnder Discussion

Discussion thread: here 

JIRA: KAFKA-5636

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, Kafka Streams implements session windows, tumbling windows, and hopping windows as windowed aggregation methods. While hopping windows with a small advance time are similar to a sliding window, this implementation's performance is poor because it results in many overlapping and often redundant windows that require expensive calculations. As sliding windows do calculations for each distinct window, the sliding window implementation would be a more efficient way to do these types of aggregations. Additionally, sliding windows are inclusive on both the start and end time, creating a different set of windows than hopping windows, which are only inclusive on the start time.

This KIP proposes adding sliding windows to give users an efficient way to do sliding aggregation. 

Public Interfaces

Add kafka.streams.kstream.SlidingWindows and APIs

public final class SlidingWindows extends Windows<TimeWindow> {
   
	/**
     * Return a SlidingWindow of size given by user in milliseconds
     *
     * @param size The size of the window, > 0
     * @return a new window definition with default maintain duration of 1 day, size as given
     * @throws IllegalArgumentException if the window size is negative or zero
     */
	public static SlidingWindows of(final Duration size) throws IllegalArgumentException{}

	 /**
     * Reject out-of-order events that arrive more than {@code afterWindowEnd}
     * after the end of its window.
     *
     * @param afterWindowEnd The grace period to admit out-of-order events to a window.
     * @return updated builder window with same size and new grace
     * @throws IllegalArgumentException if {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
     */
	public SlidingWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException{}
}


Proposed Changes


Creating a new class of Windows, SlidingWindows, to add aggregation flexibility and features for users. Sliding windows will have an inclusive start and end point and each window will be unique, meaning that each distinct set of records will only appear in one window. This is in contrast with hopping windows, which can mimic sliding windows with an advance of 1ms, but result in many overlapping windows, as shown below. This causes expensive aggregation calculations for each window, whereas for the sliding windows, aggregations are only done for each unique window, cutting down significantly on the amount of work required.

Figure 1: Windows with SlidingWindow Implementation: Window size 10ms, 7 windows for 4 records.


Figure 2: Windows with HoppingWindow with a 1ms advance: Window size 10ms, 10 windows for 4 records.


Usage

Sliding window aggregation (Note: grace will be required in SlidingWindows instead of implementing the 24 hour default seen in TimeWindows and SessionWindows):

stream.groupByKey().windowedBy(SlidingWindows.of(twentySeconds).grace(fiftySeconds).toStream()


Processing Windows

To process a new record, there are three major steps.

  1. Create the two new windows defined by this record
    1. Aggregate existing records that fall within these windows with the current record's value
  2. Find existing windows that the new record falls within
    1. Add the new record's value to these windows' aggregations
  3. Send the updated and newly created windows to the window store.

With Sliding Windows, a new record creates two new windows: one that ends at the record's timestamp, and one that starts 1ms after the record's timestamp. The first window will contain the new record, while the second will not. This is shown with record b in figure 3. These windows were chosen to ensure that for all the records, every combination of possible records given the window size will be created. This allows for easy aggregation when creating new windows, as any aggregation needed in a new window will have already been calculated in an existing window. For a new window that overlaps with existing records (record a in figure 3), these records will be aggregated and added to the new window's value.

Figure 3: New windows of size 10ms defined for b

Any already created windows that the new record falls within will be updated to have an aggregation that includes the new records value. This requires doing a scan through the WindowStore to find these windows, similar to the SessionWindow scan required for aggregating SessionWindows, and updating the values. 

Out-of-order Records

Records that come out of order will be processed the same way as in-order records, as long as they fall within the grace period. As noted above, the grace period is required to create the windows. Two new windows will be created by the late record, one ending at that record's timestamp, and one starting at 1ms past that record's timestamp. These windows will be updated with existing records that fall into them by using aggregations previously calculated. Additionally, any existing windows that the new record falls within will be updated with a new aggregation that includes this late record. 

Compatibility, Deprecation, and Migration Plan

Implementing a new feature, should be compatible with all existing features.

Rejected Alternatives

Other Types of Sliding Windows:

  • Creating a new window with the record's timestamp at the beginning, and a second window with 1ms before the record's timestamp at the end. This version of defining 2 new windows is less intuitive and is less compatible with looking at a time segment back in time.
  • Having either/both ends not inclusive. Traditionally, time windows are inclusive, and as hopping windows and tumbling windows are exclusive on the endpoint, this is a distinguishing feature. Changing exclusivity is just a matter of adding 1ms.
  • Having one window anchored and ending at the current time. By setting the retention time equal to the window size + grace period, this can be achieved, and allowing only this type of sliding window limits the historical views and therefore flexibility for use cases.

Additional Features:

  • Allow users to specify granularity of window definition (something other than 1ms). This is an option for the future, but allowing a user to specify 1 second or 1 minute granularity results in windows that are very similar to tumbling windows, straying from the purpose of sliding windows.
  • Include a 'subtraction' feature as a complement to aggregation. While this might be a nice to have feature in the future, it's out of scope for this KIP and isn't necessary to make sliding windows work well.

Other Implementation Options:

  • Making SlidingWindows an option within TimeWindow. This would add more requirements for users and blur the lines between hopping and sliding windows, while potentially making optimization harder.
  • Making SlidingWindows a stand-alone class similar to SessionWindows. Because of the similarity in SlidingWindows and TimeWindows, leveraging the current TimeWindows implementation is preferable to creating all new processors that use very similar or the same code. 


  • No labels