Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
stream
	.groupByKey()
	.windowedBy(SlidingWindows.ofwithSizeAndGrace(ofSeconds(20), ofSeconds(30))
	.toStream()

...

Code Block
stream
	.groupByKey()
	.WindowedBy(SlidingWindows.ofwithSizeAndGrace(ofSeconds(20), ofSeconds(30))
	.count(Materialized.as("count-metric"))
	.suppress(Suppressed.untilWindowClose(BufferConfig.unbounded()))
	.filter(_ < 4)
	.toStream()

...

Code Block
// Define the processing topology (here: WordCount)
KGroupedStream<String, String> groupedByWord = textLines
  .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
  .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde));

// Create a window state store named "CountsWindowStore" that contains the word counts for every minute, grace period of 2 minutes
groupedByWord.windowedBy(SlidingWindows.of(DurationwithSizeAndGraceDuration.ofMinutes(1), Duration.ofMinutes(2))
  .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>as("CountsWindowStore"));

// Get the window store named "CountsWindowStore"
ReadOnlyWindowStore<String, Long> windowStore =
    streams.store("CountsWindowStore", QueryableStoreTypes.windowStore());

//Find the most recent window using currently available queries

// Fetch values for the key "world" for all of the windows available in this application instance.
Instant timeFrom = Instant.ofEpochMilli(0); // beginning of time = oldest available
Instant timeTo = Instant.now(); // now (in processing-time)
WindowStoreIterator<Long> iterator = windowStore.fetch("world", timeFrom, timeTo);
while (iterator.hasNext()) {
  KeyValue<Long, Long> next = iterator.next();
  long windowTimestamp = next.key;
}
 System.out.println("Count of 'world' in the last window @ " + windowTimestamp + " is " + next.value);

// close the iterator to release resources
iterator.close()

//Find the most recent window using KIP-617's reverse iterator

Instant timeFrom = Instant.ofEpochMilli(0); // beginning of time = oldest available
Instant timeTo = Instant.now(); // now (in processing-time)
WindowStoreIterator<Long> iterator = windowStore.backwardFetch("world", timeFrom, timeTo);
KeyValue<Long, Long> next = iterator.next();
long windowTimestamp = next.key;
System.out.println("Count of 'world' in the last window @ " + windowTimestamp + " is " + next.value);
iterator.close()

...

Instead of creating the 24 hour default grace period seen with TimeWindows and SessionWindows, SlidingWindows will require the grace period to be set by the user. The default grace period of 24 hours is often too long and was chosen when retention time and grace period were the same. Grace period will be an additional parameter in the #of method to make sure users know that it is required : .windowedBy(SlidingWindows.ofwithSizeAndGrace(twentySeconds, fiftySeconds). If grace period isn't properly initialized, an error will be thrown through the of method withSizeAndGrace method.

Out-of-order Records

Records that come out of order will be processed the same way as in-order records, as long as they fall within the grace period. Two new windows will be created by the late record, one ending at that record's timestamp, and one starting at 1ms past that record's timestamp. These windows will be updated with existing records that fall into them by using aggregations previously calculated. If either of these windows are empty, or empty aside from the current record's value, this will be taken into account. Additionally, any existing windows that the new record falls within will be updated with a new aggregation that includes this late record. 

...