Table of Contents |
---|
Status
Current state: Under DiscussionAccepted
Discussion thread: here
JIRA:
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Motivation
- KAFKA-7277:
Right now Streams API
...
universally represents time as ms-since-unix-epoch.
There's nothing wrong, per se, with this, but Duration is more ergonomic for an API.
What we don't want is to present a heterogeneous API, so we need to make sure the whole Streams API is in terms of Duration.- KAFKA-7477:
Improve of semantics ofKafkaStreams#close(Duration)
Public Interfaces
We need to add following method to public API.
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
Windows { //Existing method. Will be deprecated. abstract long size(); //Existing methods. Will be removed. Windows<W> grace(final long millisAfterWindowEnd); long gracePeriodMs(); //New methods. abstract Duration windowSize(); abstract Map<Instant, W> windowsFor(final Instant timestamp); Windows<W> grace(final Duration afterWindowEnd); Duration gracePeriod(); } class Window { //Existing methods. Will be deprecated. Window(final long startMs, final long endMs) throws IllegalArgumentException; //New methods. Window(final Instant start, final Instant end) throws IllegalArgumentException; Instant startTime(); Instant endTime(); } JoinWindows { //Existing methods. Will be deprecated. static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException; JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException; JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException; long size(); //Existing method. Will be removed. JoinWindows grace(final long millisAfterWindowEnd); //New methods. static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException; JoinWindows before(final Duration timeDifference) throws IllegalArgumentException; JoinWindows after(final Duration timeDifference) throws IllegalArgumentException; JoinWindows grace(final Duration afterWindowEnd); Map<Instant, Window> windowsForTime(final Instant timestamp); Duration windowSize()throws IllegalArgumentException; } Materialized { //Existing method. Will be removed. Materialized<K, V, S> withRetention(final long retentionMs); //New method. Materialized<K, V, S> withRetention(final Duration retention) throws IllegalArgumentException; } SessionWindows { //Existing methods. Will be deprecated. static SessionWindows with(final long inactivityGapMs); long inactivityGap(); //Existing methods. Will be removed. SessionWindows grace(final long millisAfterWindowEnd); long gracePeriodMs(); //New methods. static SessionWindows with(final Duration inactivityGap); Duration inactivityGapDuration()throws IllegalArgumentException; SessionWindows grace(final Duration afterWindowEnd); Duration gracePeriod(); } TimeWindowedDeserializer { //Existing methods. Will be deprecated. TimeWindowedDeserializer(final Deserializer<T> inner, final long windowSizeMs); //New methods. TimeWindowedDeserializer(final Deserializer<T> inner, final Duration windowSize); Duration windowSize(); throws IllegalArgumentException; } TimeWindows { //Existing methods. Will be deprecated. static TimeWindows of(final long sizeMs) throws IllegalArgumentException; TimeWindows advanceBy(final long advanceMs); long size(); //Existing method. Will be removed. TimeWindows grace(final long millisAfterWindowEnd); //New methods. static TimeWindows of(final Duration size) throws IllegalArgumentException; TimeWindows advanceBy(final Duration advance); Map<Instant, TimeWindow> windowsFor(final Instant timestamp); Duration windowSize(); TimeWindows grace(final Duration afterWindowEnd); } UnlimitedWindows { //Existing methods. Will be deprecated. UnlimitedWindows startOn(final long startMs) throws IllegalArgumentException; long size(); //Existing method. Will be removed. UnlimitedWindows grace(final long millisAfterWindowEnd); //New methods. UnlimitedWindows startOn(final Instant start) throws IllegalArgumentException; Map<Instant, UnlimitedWindow> windowsFor(final Instant timestamp); Duration windowSize(); UnlimitedWindows grace(final Duration afterWindowEnd); } ProcessorContext { //Existing method. Will be deprecated. Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback); //New method. Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException; } ReadOnlyWindowStore<K, V> { //NewDeprecated methods. WindowStoreIterator<V> fetch(K key, Instantlong fromtimeFrom, Durationlong durationtimeTo); KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instantlong fromtimeFrom, Durationlong durationtimeTo); KeyValueIterator<Windowed<K>, V> fetchAll(Instantlong fromtimeFrom, Durationlong durationtimeTo); } SessionBytesStoreSupplier { //ExistingNew methods. Will be deprecated. long segmentIntervalMs(); long retentionPeriod(); //New methods. //This changed after initial KIP voting based on [PR discussion](https://github.com/apache/kafka/pull/5682#discussion_r222494244) DurationWindowStoreIterator<V> segmentIntervalfetch(); K key, Instant from, DurationInstant retentionPeriodDuration(); } SessionStore { //New methods. to) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, AGG>V> findSessions(finalfetch(K from, K keyto, Instant earliestSessionEndfrom, final Instant latestSessionStartto) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFromKeyValueIterator<Windowed<K>, finalV> KfetchAll(Instant keyTofrom, Instant earliestSessionEnd,to) final Instant latestSessionStart)throws IllegalArgumentException; } StoresWindowStore { //ExistingNew methods. Willwith be deprecated. static WindowBytesStoreSupplier persistentWindowStore(final String name, default implementation that checks arguments and pass it to existing fetch methods. WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, V> fetch(K from, final long retentionPeriodMs, K to, long timeFrom, long timeTo) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, final long windowSizeMs, final boolean retainDuplicates); V> fetchAll(long timeFrom, long timeTo) throws IllegalArgumentException; } Stores { //Existing methods. Will be deprecated. static WindowBytesStoreSupplier persistentWindowStore(final String name, final long retentionPeriodMs, final long windowSizeMs, final boolean retainDuplicates, final long segmentIntervalMs); ); //This method added after KIP voting. Based on John Roesler comment(https://github.com/apache/kafka/pull/5682#discussion_r221472187) static SessionBytesStoreSupplier persistentSessionStore(final String name, final long retentionPeriodMsretentionPeriod); //New methods. static WindowBytesStoreSupplier persistentWindowStore(final String name, final Durationlong retentionPeriodretentionPeriodMs, final Durationlong windowSizewindowSizeMs, final boolean retainDuplicates); static WindowBytesStoreSupplier persistentWindowStore(final String name, , final long segmentIntervalMs); //New methods. static WindowBytesStoreSupplier persistentWindowStore(final DurationString retentionPeriodname, final Duration windowSizeretentionPeriod, final booleanDuration retainDuplicateswindowSize, final Durationboolean segmentIntervalretainDuplicates) throws IllegalArgumentException; static SessionBytesStoreSupplier persistentSessionStore(final String name, final Duration retentionPeriod); } WindowBytesStoreSupplier { //Existing methods. Will be deprecated. long segmentIntervalMs(); long windowSize(); long retentionPeriod(); //New methods. Duration segmentInterval(); Duration windowSizeDuration(); Duration retentionPeriodDuration(); throws IllegalArgumentException; } KafkaStreams { //Existing method. Will be deprecated. public synchronized boolean close(final long timeout, final TimeUnit timeUnit); //New method public synchronized boolean close(final Duration timeout) throws IllegalArgumentException; } |
Proposed Changes
New methods in public API are proposed. See "Public Interfaces" section.
For the methods that used both: internally and as a part of public API the proposal is:
- In this scope keep existing methods as is.
Try to reduce the visibility of methods in next tickets. - Introduce finer methods with
Instant
andDuration
Changes in KafkaStream#close
semantics:
- reject negative numbers
- make 0 just signal and return immediately (after checking the state once)
Default implementation of fetch methods in WindowStore.
Compatibility, Deprecation, and Migration Plan
...