Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
firstline1
public final class SlidingWindows {
   
	 /**
     * Return a window definition with the window size based on the given maximum time difference between records in the same window 
	 size* and given window grace period
     * Records that come after set grace period will be ignored
     *
     * @param sizetimeDifference, The size of the max time difference between two records in a window
     * @param grace, The grace period to admit out-of-order events to a window
     * @return a new window definition
     * @throws IllegalArgumentException if the specified windowtime sizedifference or grace is zero or negative or can't be represented as {@code long milliseconds}
     */
	public static SlidingWindows withSizeAndGracewithTimeDifferenceAndGrace(final Duration sizetimeDifference, final Duration grace) throws IllegalArgumentException{}

...

Figure 1: Windows with SlidingWindow Implementation: Window size 10mstime difference 10ms, 7 windows for 4 records.

...

Figure 2: Windows with HoppingWindow with a 1ms advance: Window size 10mstime difference 10ms, 10 windows for 4 records.

...

Code Block
stream
	.groupByKey()
	.windowedBy(SlidingWindows.withSizeAndGracewithTimeDifferenceAndGrace(ofSeconds(20), ofSeconds(30))
	.toStream()

...

Code Block
stream
	.groupByKey()
	.WindowedBy(SlidingWindows.withSizeAndGracewithTimeDifferenceAndGrace(ofSeconds(20), ofSeconds(30))
	.count(Materialized.as("count-metric"))
	.suppress(Suppressed.untilWindowClose(BufferConfig.unbounded()))
	.filter(_ < 4)
	.toStream()

...

When users want to query results from IQ, they need to know the start time of the window to get results. For most users it will be target time - windowSizetimeDifference, since windows are defined backwards in SlidingWindows. For example, if there's an incident at 9:15am and they have a window size of time difference of 10 minutes, they're looking for a window with the start time of 9:05. If they don't have the exact time, they can use a range query and traverse through the results to get the one they're looking for. After KIP-617, users will be able to traverse backwards through their range query, making accessing the latest window simpler. 

Code Block
// Define the processing topology (here: WordCount)
KGroupedStream<String, String> groupedByWord = textLines
  .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
  .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde));

// Create a window state store named "CountsWindowStore" that contains the word counts for every minute, grace period of 2 minutes
groupedByWord.windowedBy(SlidingWindows.withSizeAndGraceDurationTimeDifference(Duration.ofMinutes(1), Duration.ofMinutes(2))
  .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>as("CountsWindowStore"));

// Get the window store named "CountsWindowStore"
ReadOnlyWindowStore<String, Long> windowStore =
    streams.store("CountsWindowStore", QueryableStoreTypes.windowStore());

//Find the most recent window using currently available queries

// Fetch values for the key "world" for all of the windows available in this application instance.
Instant timeFrom = Instant.ofEpochMilli(0); // beginning of time = oldest available
Instant timeTo = Instant.now(); // now (in processing-time)
WindowStoreIterator<Long> iterator = windowStore.fetch("world", timeFrom, timeTo);
while (iterator.hasNext()) {
  KeyValue<Long, Long> next = iterator.next();
  long windowTimestamp = next.key;
}
 System.out.println("Count of 'world' in the last window @ " + windowTimestamp + " is " + next.value);

// close the iterator to release resources
iterator.close()

//Find the most recent window using KIP-617's reverse iterator

Instant timeFrom = Instant.ofEpochMilli(0); // beginning of time = oldest available
Instant timeTo = Instant.now(); // now (in processing-time)
WindowStoreIterator<Long> iterator = windowStore.backwardFetch("world", timeFrom, timeTo);
KeyValue<Long, Long> next = iterator.next();
long windowTimestamp = next.key;
System.out.println("Count of 'world' in the last window @ " + windowTimestamp + " is " + next.value);
iterator.close()

...

With Sliding Windows, a new record creates two new windows: one that ends at the record's timestamp, and one that starts 1ms after the record's timestamp. The first window will contain the new record, while the second will not. This is shown with record b in figure 3. These windows were chosen to ensure that for all the records, every combination of possible records given the window size will time difference will be created. This allows for easy aggregation when creating new windows, as any aggregation needed in a new window will have already been calculated in an existing window, explanation here. For a new window that overlaps with existing records (record a in figure 3), these records will be aggregated and added to the new window's value.

Figure 3: New windows of size 10ms time difference 10ms defined for b

Any already created windows that the new record falls within will be updated to have an aggregation that includes the new records value. This requires doing a scan through the WindowStore to find these windows, similar to the SessionWindow scan required for aggregating SessionWindows, and updating the values. 

...

Instead of creating the 24 hour default grace period seen with TimeWindows and SessionWindows, SlidingWindows will require the grace period to be set by the user. The default grace period of 24 hours is often too long and was chosen when retention time and grace period were the same. Grace period will be an additional parameter in the #of method to make sure users know that it is required: .windowedBy(SlidingWindows.withSizeAndGracewithTimeDifferenceAndGrace(twentySeconds, fiftySeconds). If grace period isn't properly initialized, an error will be thrown through the withSizeAndGrace withTimeDifferenceAndGrace method.

Out-of-order Records

...

  • Creating a new window with the record's timestamp at the beginning, and a second window with 1ms before the record's timestamp at the end. This version of defining 2 new windows is less intuitive and is less compatible with looking at a time segment back in time.
  • Having either/both ends not inclusive. Traditionally, time windows are inclusive, and as hopping windows and tumbling windows are exclusive on the endpoint, this is a distinguishing feature. Changing exclusivity is just a matter of adding 1ms.
  • Having one window anchored and ending at the current time. By setting the retention time equal to the window size time difference + grace period, this can be achieved, and allowing only this type of sliding window limits the historical views and therefore flexibility for use cases.

...