You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »


Status

Current stateUnder Discussion

Discussion thread: - 

JIRA: Unable to render Jira issues macro, execution error.

POC: https://github.com/apache/kafka/pull/9031

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Presently, windowed aggregations in KafkaStreams fall into two categories:

  • Windows
    • TimeWindows
    • UnlimitedWindows
    • JoinWindows
  • SessionWindows

Unfortunately, Windows is an abstract class instead of an interface, and it forces some fields onto its implementations. This has led to a number of problems over the years, but so far we have been able to live with them.

However, as we consider adding new implementations to this side of the hierarchy, the damage will spread. See KIP-450, for example.

We should take the opportunity now to correct the issue by introducing an interface and deprecating Windows itself. Then, we can implement new features cleanly and maybe remove Windows in the 3.0 release.

Public Interfaces

  1. Add new interface to take the place of Windows: FixedSizeWindowDefinition
  2. Add "implements FixedSizeWindowDefinition" to TimeWindows and UnlimitedWindows
    1. Do not add the new interface to JoinWindows, which should not be part of this hierarchy. It will naturally become disconnected when we remove Windows
  3. Deprecate Windows

new interface to take the place of Windows: FixedSizeWindowDefinition

FixedSizeWindowDefinition
/**
 * The window specification for fixed size windows that is used to define window boundaries and grace period.
 * <p>
 * Grace period defines how long to wait on out-of-order events. That is, windows will continue to accept new records until {@code stream_time >= window_end + grace_period}.
 * Records that arrive after the grace period passed are considered <em>late</em> and will not be processed but are dropped.
 *
 * @param <W> type of the window instance
 */
public interface FixedSizeWindowDefinition<W extends Window> {


    /**
     * List all possible windows that contain the provided timestamp,
     * indexed by non-negative window start timestamps.
     *
     * @param timestamp the timestamp window should get created for
     * @return a map of {@code windowStartTimestamp -> Window} entries
     */
    Map<Long, W> windowsFor(final long timestamp);

    /**
     * Return the size of the specified windows in milliseconds.
     *
     * @return the size of the specified windows
     */
    long size();

    /**
     * Return the window grace period (the time to admit
     * out-of-order events after the end of the window.)
     *
     * Delay is defined as (stream_time - record_timestamp).
     */
    long gracePeriodMs();
}


add FixedSizeWindowDefinition to TimeWindows and UnlimitedWindows

new implementations
@SuppressWarnings("deprecation") // Remove this suppression when Windows is removed
public final class TimeWindows extends Windows<TimeWindow> implements FixedSizeWindowDefinition<TimeWindow> {
// ... nothing else needs to change
}

@SuppressWarnings("deprecation") // Remove this suppression when Windows is removed
public final class UnlimitedWindows extends Windows<UnlimitedWindow> implements FixedSizeWindowDefinition<UnlimitedWindow> {
// ... nothing else needs to change
}


Deprecation of Windows

deprecation
/**
 * ... the javadoc doesn't change
 *
 * @deprecated since 2.7 Implement FixedSizeWindowDefinition instead.
 */
@Deprecated
public abstract class Windows<W extends Window> implements FixedSizeWindowDefinition<W> {
// ... nothing else changes
}


Compatibility, Deprecation, and Migration Plan

 Windows is deprecated. Otherwise, no compatibility issues arise. We can remove Windows cleanly later on.

Rejected Alternatives

 None (yet)

  • No labels