Current state: Accepted
Discussion thread: here
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
The grace period is a parameter of windowed operations such as Window or Session aggregates, or stream-stream joins. This config determines how long after a window ends any new data will still be processed. Records coming in after the grace period has elapsed will be dropped from those windows.
The current default value is 24hours, which has caused continuous problems and confusion for users of suppression since it means results won’t show up for 24 hours. We'd like to drop this default value and give users a better way to make an informed decision in selecting a grace period.
All the classes extending from Windows will be aligned with the new approach to grace period. Two new static constructors will be added, one which accepts a grace period and one which indicates to use no grace period and close the
window immediately when the window ends.
public class TimeWindows { @Deprecated public static TimeWindows of(Duration size); @Deprecated public TimeWindows grace(final Duration afterWindowEnd); // New public static TimeWindows ofSizeWithNoGrace(final Duration size); // New public static TimeWindows ofSizeAndGrace(final Duration size, final Duration afterWindowEnd); } |
public class SessionWindows { @Deprecated public static SessionWindows with(final Duration inactivityGap); @Deprecated public SessionWindows grace(final Duration afterWindowEnd); // New public static SessionWindows ofInactivityGapWithNoGrace(final Duration inactivityGap); // New public static SessionWindows ofInactivityGapAndGrace(final Duration inactivityGap, final Duration afterWindowEnd); } |
public class JoinWindows { @Deprecated public static JoinWindows of(final Duration timeDifference); @Deprecated public JoinWindows grace(final Duration afterWindowEnd); // New public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration timeDifference); // New public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd); } |
public class SlidingWindows { @Deprecated public static SlidingWindows withTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd); // New public static SlidingWindows ofTimeDifferenceWithNoGrace(final Duration timeDifference); // New public static SlidingWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd); } |
The underlying principle here is that grace period is a fundamental concept in Kafka Streams, not an advanced feature which we only recommend for advanced users and which most applications can ignore. At the same time, sometimes a new user just wants to get a demo up and running without necessarily diving into every detail of a windowed operation. An ideal API is one in which the user is forced to at least acknowledge – for example by picking some auto-complete method in their IDE – that they are choosing one set of semantics over another.
Therefore, all existing APIs which don't accept a grace period and apply a default grace period of 24 hours will be removed. These will be replaced with two new APIs: one which ends in `WithNoGrace` and applies a grace period of 0, and one which takes the grace period as a required parameter. In constructing a windowed operation, users must choose between one of these two APIs and make a conscious decision whether to select a grace period or ignore that parameter for the time being.
To elaborate further, the current APIs will continue to have the grace period of 24 hours, however the new APIs will have the option of setting a grace period or having a default grace period of zero milliseconds if none is specified in the method signature. The *NoGrace() counterparts of the API methods will have 0 milliseconds as the grace period and the corresponding *AndGrace() API methods will have the ability to specify the grace period in the calling code.
For stream-stream left/outer join, we will in addition apply a semantic improvement by enabling if the new `JoinWindows.ofTimeDifferenceWithNoGrace` or `JoinWindows.ofTimeDifferenceAndGrace()` are used. If the existing `JoinsWindows.of()` is used, the old semantics to emit left/outer join results eagerly will be used. This will allow users to opt-into the new feature. The reason why we cannot apply KAFKA-10847 by default is the default grace period of 24h in the existing API (cf. ) that may lead to emitting left/outer join result very delayed and could break existing applications.
Users who currently rely on any of the deprecated methods will need to migrate to one of the two new APIs, and make a conscious decision to skip the grace period and close a window immediately, apply the old default of 24 hours, or choose a new grace period entirely.