Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
firstline1
public final class SlidingWindows extends Windows<TimeWindow> {
   
	 /**
     * Return a SlidingWindowwindow definition ofwith sizethe given by user in milliseconds
     *
     * @param size The size of the window, > 0window size and given window grace period
     * @returnRecords athat newcome windowafter definitionset withgrace defaultperiod maintainwill duration of 1 day, size as givenbe ignored
     * @throws IllegalArgumentException if the window size is negative or zero
     */
	public static@param SlidingWindows of(final Duration size) throws IllegalArgumentException{}

	 /**
     * Reject out-of-order events that arrive more than {@code afterWindowEnd}
     * after the end of its window.
     *size, The size of the window
     * @param afterWindowEndgrace, The grace period to admit out-of-order events to a window.
     * @return updateda buildernew window with same size and new gracedefinition
     * @throws IllegalArgumentException if {@code afterWindowEnd} the specified window size or grace is zero or negative or can't be represented as {@code long milliseconds}
     */
	public static SlidingWindows grace(of(final Duration size, final Duration afterWindowEndgrace) throws IllegalArgumentException{}
}

Proposed Changes

Creating a new class of Windows, SlidingWindows, to add aggregation flexibility and features for users. Sliding windows will have an inclusive start and end point and each window will be unique, meaning that each distinct set of records will only appear in one window. This is in contrast with hopping windows, which can mimic sliding windows with an advance of 1ms, but result in many overlapping windows, as shown below. This causes expensive aggregation calculations for each window, whereas for the sliding windows, aggregations are only done for each unique window, cutting down significantly on the amount of work required.

...

Code Block
stream
	.groupByKey()
	.windowedBy(SlidingWindows.of(ofSeconds(20)).grace(, ofSeconds(30))
	.toStream()

...

Code Block
stream
	.groupByKey()
	.WindowedBy(SlidingWindows.of(ofSeconds(20).grace(, ofSeconds(30))
	.count(Materialized.as("count-metric"))
	.suppress(Suppressed.untilWindowClose(BufferConfig.unbounded()))
	.filter(_ < 4)
	.toStream()

...

Code Block
// Define the processing topology (here: WordCount)
KGroupedStream<String, String> groupedByWord = textLines
  .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
  .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde));

// Create a window state store named "CountsWindowStore" that contains the word counts for every minute, grace period of 2 minutes
groupedByWord.windowedBy(SlidingWindows.of(Duration.ofMinutes(1)).grace(, Duration.ofMinutes(2)))
  .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>as("CountsWindowStore"));

// Get the window store named "CountsWindowStore"
ReadOnlyWindowStore<String, Long> windowStore =
    streams.store("CountsWindowStore", QueryableStoreTypes.windowStore());

//Find the most recent window using currently available queries

// Fetch values for the key "world" for all of the windows available in this application instance.
Instant timeFrom = Instant.ofEpochMilli(0); // beginning of time = oldest available
Instant timeTo = Instant.now(); // now (in processing-time)
WindowStoreIterator<Long> iterator = windowStore.fetch("world", timeFrom, timeTo);
while (iterator.hasNext()) {
  KeyValue<Long, Long> next = iterator.next();
  long windowTimestamp = next.key;
}
 System.out.println("Count of 'world' in the last window @ " + windowTimestamp + " is " + next.value);

// close the iterator to release resources
iterator.close()

//Find the most recent window using KIP-617's reverse iterator

Instant timeFrom = Instant.ofEpochMilli(0); // beginning of time = oldest available
Instant timeTo = Instant.now(); // now (in processing-time)
WindowStoreIterator<Long> iterator = windowStore.backwardFetch("world", timeFrom, timeTo);
KeyValue<Long, Long> next = iterator.next();
long windowTimestamp = next.key;
System.out.println("Count of 'world' in the last window @ " + windowTimestamp + " is " + next.value);
iterator.close()

...

Instead of creating the 24 hour default grace period seen with TimeWindows and SessionWindows, SlidingWindows will require the grace period to be set by the user. The default grace period of 24 hours is often too long and was chosen when retention time and grace period were the same. To keep formatting consistent with other types of windows, grace period won't Grace period will be an additional parameter in the #of method , but will still look like it does in other use casesto make sure users know that it is required .windowedBy(SlidingWindows.of(twentySeconds).grace(, fiftySeconds). If grace period isn't properly initialized, an error will be thrown through the process of method.

Out-of-order Records

...