Status
Current state: Under Discussion
Discussion thread: -
JIRA:
POC: https://github.com/apache/kafka/pull/9031
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Presently, windowed aggregations in KafkaStreams fall into two categories:
- Windows
- TimeWindows
- UnlimitedWindows
- JoinWindows
- SessionWindows
Unfortunately, Windows is an abstract class instead of an interface, and it forces some fields onto its implementations. This has led to a number of problems over the years, but so far we have been able to live with them.
However, as we consider adding new implementations to this side of the hierarchy, the damage will spread. See KIP-450, for example.
We should take the opportunity now to correct the issue by introducing an interface and deprecating Windows itself. Then, we can implement new features cleanly and maybe remove Windows in the 3.0 release.
Public Interfaces
- Add new interface to take the place of Windows: FixedSizeWindowDefinition
- Add "implements FixedSizeWindowDefinition" to TimeWindows and UnlimitedWindows
- Do not add the new interface to JoinWindows, which should not be part of this hierarchy. It will naturally become disconnected when we remove Windows
- Deprecate Windows
new interface to take the place of Windows: FixedSizeWindowDefinition
/** * The window specification for fixed size windows that is used to define window boundaries and grace period. * <p> * Grace period defines how long to wait on out-of-order events. That is, windows will continue to accept new records until {@code stream_time >= window_end + grace_period}. * Records that arrive after the grace period passed are considered <em>late</em> and will not be processed but are dropped. * * @param <W> type of the window instance */ public interface FixedSizeWindowDefinition<W extends Window> { /** * List all possible windows that contain the provided timestamp, * indexed by non-negative window start timestamps. * * @param timestamp the timestamp window should get created for * @return a map of {@code windowStartTimestamp -> Window} entries */ Map<Long, W> windowsFor(final long timestamp); /** * Return the size of the specified windows in milliseconds. * * @return the size of the specified windows */ long size(); /** * Return the window grace period (the time to admit * out-of-order events after the end of the window.) * * Delay is defined as (stream_time - record_timestamp). */ long gracePeriodMs(); }
add FixedSizeWindowDefinition to TimeWindows and UnlimitedWindows
@SuppressWarnings("deprecation") // Remove this suppression when Windows is removed public final class TimeWindows extends Windows<TimeWindow> implements FixedSizeWindowDefinition<TimeWindow> { // ... nothing else needs to change } @SuppressWarnings("deprecation") // Remove this suppression when Windows is removed public final class UnlimitedWindows extends Windows<UnlimitedWindow> implements FixedSizeWindowDefinition<UnlimitedWindow> { // ... nothing else needs to change }
Deprecation of Windows
/** * ... the javadoc doesn't change * * @deprecated since 2.7 Implement FixedSizeWindowDefinition instead. */ @Deprecated public abstract class Windows<W extends Window> implements FixedSizeWindowDefinition<W> { // ... nothing else changes }
Compatibility, Deprecation, and Migration Plan
Windows is deprecated. Otherwise, no compatibility issues arise. We can remove Windows cleanly later on.
Rejected Alternatives
None (yet)