Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When users want to query results from IQ, they need to know the start time of the window to get results. For most users it will be windowSize+the time they're looking at, since windows are defined backwards in SlidingWindows. For example, if there's an incident at 9:15am and they have a window size of 10 minutes, they're looking for a window with the start time of 10:15. If they don't have the exact time, they can use a range query and traverse through the results to get the one they're looking for. After KIP-617, users will be able to traverse backwards through their range query, making accessing the latest window simpler. 

Code Block
// Define the processing topology (here: WordCount)
KGroupedStream<String, String> groupedByWord = textLines
  .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
  .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde));

// Create a window state store named "CountsWindowStore" that contains the word counts for every minute, grace period of 2 minutes
groupedByWord.windowedBy(SlidingWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(2)))
  .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>as("CountsWindowStore"));

// Get the window store named "CountsWindowStore"
ReadOnlyWindowStore<String, Long> windowStore =
    streams.store("CountsWindowStore", QueryableStoreTypes.windowStore());

// Fetch values for the key "world" for all of the windows available in this application instance.
// To get the latest window, we can pull all existing windows and iterate through to the last one
Instant timeFrom = Instant.ofEpochMilli(0); // beginning of time = oldest available
Instant timeTo = Instant.now(); // now (in processing-time)
WindowStoreIterator<Long> iterator = windowStore.fetch("world", timeFrom, timeTo);
while (iterator.hasNext()) {
  KeyValue<Long, Long> next = iterator.next();
  long windowTimestamp = next.key;
}
 System.out.println("Count of 'world' in the last window @ " + windowTimestamp + " is " + next.value);

// close the iterator to release resources
iterator.close()


...

With Sliding Windows, a new record creates two new windows: one that ends at the record's timestamp, and one that starts 1ms after the record's timestamp. The first window will contain the new record, while the second will not. This is shown with record b in figure 3. These windows were chosen to ensure that for all the records, every combination of possible records given the window size will be created. This allows for easy aggregation when creating new windows, as any aggregation needed in a new window will have already been calculated in an existing window, explanation here. For a new window that overlaps with existing records (record a in figure 3), these records will be aggregated and added to the new window's value.

...

Records that come out of order will be processed the same way as in-order records, as long as they fall within the grace period. As noted above, the grace period is required to create the windows. Two new windows will be created by the late record, one ending at that record's timestamp, and one starting at 1ms past that record's timestamp. These windows will be updated with existing records that fall into them by using aggregations previously calculated. Additionally, any existing windows that the new record falls within will be updated with a new aggregation that includes this late record

Emitting Results

As with hopping windows, sliding windows will emit partial results as windows are updated. This means that any window that gets a new aggregate or an updated aggregate will emit. This feature can be suppressed through suppression, which allows users to ignore all results except for the final one, only emitting after the grace period has passed. In the future, suppress can be updated to emit results for windows that are still within the grace period but whose start and end points are both earlier than stream time

Compatibility, Deprecation, and Migration Plan

...