Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Add new interface to take the place of Windows: EnumerableWindowDefinition
  2. Add "implements EnumerableWindowDefinition" to TimeWindows and UnlimitedWindows
    1. Do not add the new interface to JoinWindows, which should not be part of this hierarchy. It will naturally become disconnected when we remove Windows
  3. Add "implements EnumerableWindowDefinition" to Windows and deprecate it.

...

  1. Swap out the argument type in both windowBy methods

New interface to take the place of Windows: EnumerableWindowDefinition

Code Block
languagejava
titleFixedSizeWindowDefinition
linenumberstrue
/**
 * The window specification for windows that can be enumerated for a single event based on its time.
 * <p>
 * Grace period defines how long to wait on out-of-order events. That is, windows will continue to accept new records until {@code stream_time >= window_end + grace_period}.
 * Records that arrive after the grace period passed are considered <em>late</em> and will not be processed but are dropped.
 * <p>
 * Window state is stored until it is no longer needed, measured from the beginning of the window. To assist the store in not discarding window state too soon, this definition
 * also includes a {@code maxSize()}, indicating the longest span windows can have between their start and end. This does not need to include the grace period.
 *
 * @param <W> type of the window instance
 */
public interface EnumerableWindowDefinition<W extends Window> {


    /**
     * List all possible windows that contain the provided timestamp,
     * indexed by non-negative window start timestamps.
     *
     * @param timestamp the timestamp window should get created for
     * @return a map of {@code windowStartTimestamp -> Window} entries
     */
    Map<Long, W> windowsFor(final long timestamp);

    /**
     * Return an upper bound on the size of windows in milliseconds.
     * Used to determine the lower bound on store retention time.
     *
     * @return the maximum size of the specified windows
     */
    long maxSize();

    /**
     * Return the window grace period (the time to admit
     * out-of-order events after the end of the window.)
     *
     * Delay is defined as (stream_time - record_timestamp).
     */
    long gracePeriodMs();
}

...

Code Block
languagejava
titledeprecation
/**
 * ... the javadoc doesn't change
 *
 * @deprecated since 2.7 Implement EnumerableWindowDefinition instead.
 */
@Deprecated
public abstract class Windows<W extends Window> implements EnumerableWindowDefinition<W> {
// ... nothing else changes
}

Swap out the argument type in windowBy

Note, because Windows now implements EnumerableWindowDefinition, all existing implementations of Windows will automatically work with this change, so there is no compatibility concern.

Code Block
languagejava
public interface CogroupedKStream<K, VOut> {
...
-    <W extends Window> TimeWindowedCogroupedKStream<K, VOut> windowedBy(final Windows<W> windows);

+    <W extends Window> TimeWindowedCogroupedKStream<K, VOut> windowedBy(final EnumerableWindowDefinition<W> windows);
...

}


Code Block
languagejava
public interface KGroupedStream<K, V> {
...
-    <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W> windows);
+    <W extends Window> TimeWindowedKStream<K, V> windowedBy(final EnumerableWindowDefinition<W> windows);
...
}

Compatibility, Deprecation, and Migration Plan

 Windows is deprecated. Otherwise, no compatibility issues arise. We can remove Windows cleanly later on.

Rejected AlternativesAlternatives 

 None None (yet)