Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Add kafka.streams.kstream.SlidingWindows and APIsSlidingWindows

Code Block
languagejava
firstline1
public final class SlidingWindows extends Windows<TimeWindow> {
   
	 /**
     * Return a window definition with the given window size and given window grace period
     * Records that come after set grace period will be ignored
     *
     * @param size, The size of the window
     * @param grace, The grace period to admit out-of-order events to a window
     * @return a new window definition
     * @throws IllegalArgumentException if the specified window size or grace is zero or negative or can't be represented as {@code long milliseconds}
     */
	public static SlidingWindows of(final Duration size, final Duration grace) throws IllegalArgumentException{}


Addition additional windowedBy method to KGroupedStream.java

Code Block
/**
     * Create a new {@link TimeWindowedKStream} instance that can be used to perform session windowed aggregations.
     * @param windows the specification of the aggregation {@link SlidingWindows}
     * @return an instance of {@link TimeWindowedKStream}
     */
    TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows);

Proposed Changes

Creating a new finals class of Windows, SlidingWindows, to add aggregation flexibility and features for users. Sliding windows will have an inclusive start and end point and each window will be unique, meaning that each distinct set of records will only appear in one window. This is in contrast with hopping windows, which can mimic sliding windows with an advance of 1ms, but result in many overlapping windows, as shown below. This causes expensive aggregation calculations for each window, whereas for the sliding windows, aggregations are only done for each unique window, cutting down significantly on the amount of work required.

...

  1. Create the two new windows defined by this record (assuming these windows do not yet exist)
    1. Aggregate existing records that fall within these windows with the current record's value (or note that these windows are empty and ignore them)
  2. Find existing windows that the new record falls within
    1. Add the new record's value to these windows' aggregations

...

Records that come out of order will be processed the same way as in-order records, as long as they fall within the grace period. Two new windows will be created by the late record, one ending at that record's timestamp, and one starting at 1ms past that record's timestamp. These windows will be updated with existing records that fall into them by using aggregations previously calculated. If either of these windows are empty, or empty aside from the current record's value, this will be taken into account. Additionally, any existing windows that the new record falls within will be updated with a new aggregation that includes this late record. 

...

  • Making SlidingWindows an option within TimeWindow. This would add more requirements for users and blur the lines between hopping and sliding windows, while potentially making optimization harder.
  • Making SlidingWindows a stand-alone class similar to SessionWindows. Because of the similarity in SlidingWindows and TimeWindows, leveraging the current TimeWindows implementation is preferable to creating all new processors that use very similar or the same code. extend Windows<Window> or Windows<TimeWindow>. To keep the code clean and allow for future-proofing this was not chosen.
  • Setting the default grace period to 0ms. This was deemed to be too confusing for users who are used to a 24 hour default and would now have to remember which type of windows have a certain grace period default. 

...