Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Figure 2: Windows with HoppingWindow with a 1ms advance: Window size 10ms, 10 windows for 4 records.

Usage

Typical Use

Sliding window aggregation to create windows of twenty seconds with a grace period of 30 seconds.

(Note: grace will be required in SlidingWindows instead of implementing the 24 hour default seen in TimeWindows and SessionWindows):

Code Block
stream
	.groupByKey()
	.windowedBy(SlidingWindows.of(ofSeconds(20)).grace(ofSeconds(30))
	.toStream()


Using Suppress to Limit Results

Send an alert if the window has fewer than 4 records, using suppression to limit all but the final results of the window. Original example here, suppression details here.

Code Block
stream
	.groupByKey()
	.WindowedBy(SlidingWindows.of(ofSeconds(twentySeconds20).grace(fiftySeconds)(ofSeconds(30))
	.count(Materialized.as("count-metric"))
	.suppress(Suppressed.untilWindowClose(BufferConfig.unbounded()))
	.filter(_ < 4)
	.toStream()


Querying IQ

When users want to query results from IQ, they need to know the start time of the window to get results. For most users it will be windowSize+the time they're looking at, since windows are defined backwards in SlidingWindows. For example, if there's an incident at 9:15am and they have a window size of 10 minutes, they're looking for a window with the start time of 8:15. If they don't have the exact time, they can use a range query and traverse through the results to get the one they're looking for. After KIP-617, users will be able to traverse backwards through their range query, making accessing the latest window simpler. 

Code Block
// Define the processing topology (here: WordCount)
KGroupedStream<String, String> groupedByWord = textLines
  .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
  .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde));

// Create a window state store named "CountsWindowStore" that contains the word counts for every minute, grace period of 2 minutes
groupedByWord.windowedBy(SlidingWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(2)))
  .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>as("CountsWindowStore"));

// Get the window store named "CountsWindowStore"
ReadOnlyWindowStore<String, Long> windowStore =
    streams.store("CountsWindowStore", QueryableStoreTypes.windowStore());

//Find the most recent window using currently available queries

// Fetch values for the key "world" for all of the windows available in this application instance.
// To get the latest window, we can pull all existing windows and iterate through to the last one
Instant timeFrom = Instant.ofEpochMilli(0); // beginning of time = oldest available
Instant timeTo = Instant.now(); // now (in processing-time)
WindowStoreIterator<Long> iterator = windowStore.fetch("world", timeFrom, timeTo);
while (iterator.hasNext()) {
  KeyValue<Long, Long> next = iterator.next();
  long windowTimestamp = next.key;
}
 System.out.println("Count of 'world' in the last window @ " + windowTimestamp + " is " + next.value);

// close the iterator to release resources
iterator.close()

//Find the most recent window using KIP-617's reverse iterator

Instant timeFrom = Instant.ofEpochMilli(0); // beginning of time = oldest available
Instant timeTo = Instant.now(); // now (in processing-time)
WindowStoreIterator<Long> iterator = windowStore.backwardFetch("world", timeFrom, timeTo);
KeyValue<Long, Long> next = iterator.next();
long windowTimestamp = next.key;
System.out.println("Count of 'world' in the last window @ " + windowTimestamp + " is " + next.value);
iterator.close()

Processing Windows

To process a new record, there are three major steps.

...

Any already created windows that the new record falls within will be updated to have an aggregation that includes the new records value. This requires doing a scan through the WindowStore to find these windows, similar to the SessionWindow scan required for aggregating SessionWindows, and updating the values. 

Grace Period

Instead of creating the 24 hour default grace period seen with TimeWindows and SessionWindows, SlidingWindows will require the grace period to be set by the user. The default grace period of 24 hours is often too long and was chosen when retention time and grace period were the same. To keep formatting consistent with other types of windows, grace period won't be an additional parameter in the #of method, but will still look like it does in other use cases: .windowedBy(SlidingWindows.of(twentySeconds).grace(fiftySeconds). If grace period isn't properly initialized, an error will be thrown through the process method.

Out-of-order Records

Records that come out of order will be processed the same way as in-order records, as long as they fall within the grace period. As noted above, the grace period is required to create the windows. Two  Two new windows will be created by the late record, one ending at that record's timestamp, and one starting at 1ms past that record's timestamp. These windows will be updated with existing records that fall into them by using aggregations previously calculated. Additionally, any existing windows that the new record falls within will be updated with a new aggregation that includes this late record. 

...