You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-12839

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In KIP-450: Sliding Window Aggregations in the DSL, we introduce Sliding Window to do calculations for each distinct window, the sliding window implementation would be a more efficient way to do these types of aggregations. In the Sliding Window, we was planning to have windows with inclusive on both the start and end time, but currently, we implemented in TimeWindow which is a half-open time interval. We should introduce a new window type: SlidingWindow, to have "\[start, end]" time interval.

Public Interfaces

Add org.apache.kafka.streams.kstream.internals.SlidingWindow


/**
* A {@link TimeWindow} covers a half-open time interval with its start timestamp as an inclusive boundary and its end
* timestamp as exclusive boundary.
* It is a fixed size window, i.e., all instances (of a single {@link org.apache.kafka.streams.kstream.TimeWindows
* window specification}) will have the same size.
* <p>
* For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
*
* @see SessionWindow
* @see UnlimitedWindow
* @see org.apache.kafka.streams.kstream.TimeWindows
* @see org.apache.kafka.streams.processor.TimestampExtractor
*/
public class SlidingWindow extends Window {

/**
* Create a new window for the given start time (inclusive) and end time (exclusive).
*
* @param startMs the start timestamp of the window (inclusive)
* @param endMs the end timestamp of the window (exclusive)
* @throws IllegalArgumentException if {@code startMs} is negative or if {@code endMs} is smaller than or equal to
* {@code startMs}
*/
public TimeWindow(final long startMs, final long endMs) throws IllegalArgumentException {
super(startMs, endMs);
if (startMs == endMs) {
throw new IllegalArgumentException("Window endMs must be greater than window startMs.");
}
}

/**
* Check if the given window overlaps with this window.
*
* @param other another window
* @return {@code true} if {@code other} overlaps with this window&mdash;{@code false} otherwise
* @throws IllegalArgumentException if the {@code other} window has a different type than {@code this} window
*/
@Override
public boolean overlap(final Window other) throws IllegalArgumentException {
if (getClass() != other.getClass()) {
throw new IllegalArgumentException("Cannot compare windows of different type. Other window has type "
+ other.getClass() + ".");
}
final TimeWindow otherWindow = (TimeWindow) other;
return startMs < otherWindow.endMs && otherWindow.startMs < endMs;
}

}


Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels