Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When users want to query results from IQ, they need to know the start time of the window to get results. For most users it will be windowSize+the time they're looking at, since windows are defined backwards in SlidingWindows. For example, if there's an incident at 9:15am and they have a window size of 10 minutes, they're looking for a window with the start time of 10:15. If they don't have the exact time, they can use a range query and traverse through the results to get the one they're looking for.

Code Block
// Define the processing topology (here: WordCount)
KGroupedStream<String, String> groupedByWord = textLines
  .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
  .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde));

// Create a window state store named "CountsWindowStore" that contains the word counts for every minute, grace period of 2 minutes
groupedByWord.windowedBy(SlidingWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(2)))
  .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>as("CountsWindowStore"));

// Get the window store named "CountsWindowStore"
ReadOnlyWindowStore<String, Long> windowStore =
    streams.store("CountsWindowStore", QueryableStoreTypes.windowStore());

// Fetch values for the key "world" for all of the windows available in this application instance.
// To get the latest window, we can pull all existing windows and iterate through to the last one
Instant timeFrom = Instant.ofEpochMilli(0); // beginning of time = oldest available
Instant timeTo = Instant.now(); // now (in processing-time)
WindowStoreIterator<Long> iterator = windowStore.fetch("world", timeFrom, timeTo);
while (iterator.hasNext()) {
  KeyValue<Long, Long> next = iterator.next();
  long windowTimestamp = next.key;
}
 System.out.println("Count of 'world' in the last window @ " + windowTimestamp + " is " + next.value);

// close the iterator to release resources
iterator.close()



Processing Windows

To process a new record, there are three major steps.

...