...
When users want to query results from IQ, they need to know the start time of the window to get results. For most users it will be windowSize+the time they're looking at, since windows are defined backwards in SlidingWindows. For example, if there's an incident at 9:15am and they have a window size of 10 minutes, they're looking for a window with the start time of 10:15. If they don't have the exact time, they can use a range query and traverse through the results to get the one they're looking for.
Code Block |
---|
// Define the processing topology (here: WordCount)
KGroupedStream<String, String> groupedByWord = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde));
// Create a window state store named "CountsWindowStore" that contains the word counts for every minute, grace period of 2 minutes
groupedByWord.windowedBy(SlidingWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(2)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>as("CountsWindowStore"));
// Get the window store named "CountsWindowStore"
ReadOnlyWindowStore<String, Long> windowStore =
streams.store("CountsWindowStore", QueryableStoreTypes.windowStore());
// Fetch values for the key "world" for all of the windows available in this application instance.
// To get the latest window, we can pull all existing windows and iterate through to the last one
Instant timeFrom = Instant.ofEpochMilli(0); // beginning of time = oldest available
Instant timeTo = Instant.now(); // now (in processing-time)
WindowStoreIterator<Long> iterator = windowStore.fetch("world", timeFrom, timeTo);
while (iterator.hasNext()) {
KeyValue<Long, Long> next = iterator.next();
long windowTimestamp = next.key;
}
System.out.println("Count of 'world' in the last window @ " + windowTimestamp + " is " + next.value);
// close the iterator to release resources
iterator.close()
|
Processing Windows
To process a new record, there are three major steps.
...