Table of Contents |
---|
Status
Current state: Under DiscussionAccepted
Discussion thread: here
JIRA:
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Motivation
- KAFKA-7277:
Right now Streams API
...
universally represents time as ms-since-unix-epoch.
There's nothing wrong, per se, with this, but Duration is more ergonomic for an API.
What we don't want is to present a heterogeneous API, so we need to make sure the whole Streams API is in terms of Duration.- KAFKA-7477:
Improve of semantics ofKafkaStreams#close(Duration)
Public Interfaces
We need to add following method to public API.
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
Windows { //Existing method. Will be deprecated. abstract long size(); abstract Map<Long, W> windowsFor(final long timestamp); //Existing methods. Will be removed. Windows<W> grace(final long millisAfterWindowEnd); long gracePeriodMs(); //New methods. abstract Duration windowSize(); abstract Map<Instant, W> windowsFor(final Instant timestamp); Windows<W> grace(final Duration afterWindowEnd); Duration gracePeriod(); } class Window { //Existing methods. Will be deprecated. Window(final long startMs, final long endMs) throws IllegalArgumentException; long start(); long end(); //New methods. Window(final Instant start, final Instant end) throws IllegalArgumentException; Instant startTime(); Instant endTime(); } JoinWindows { //Existing methods. Will be deprecated. static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException; JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException; JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException; Map<Long, Window> windowsFor(final long timestamp); long size(); //Existing method. Will be removed. JoinWindows grace(final long millisAfterWindowEnd); //New methods. static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException; JoinWindows before(final Duration timeDifference) throws IllegalArgumentException; JoinWindows after(final Duration timeDifference) throws IllegalArgumentException; JoinWindows grace(final Duration afterWindowEnd); Map<Instant, Window> windowsForTime(final Instant timestamp); Duration windowSize(); throws IllegalArgumentException; } Materialized { //Existing method. Will be removed. Materialized<K, V, S> withRetention(final long retentionMs); //New method. Materialized<K, V, S> withRetention(final Duration retention) throws IllegalArgumentException; } SessionWindows { //Existing methods. Will be deprecated. static SessionWindows with(final long inactivityGapMs); long inactivityGap(); //Existing methods. Will be removed. SessionWindows grace(final long millisAfterWindowEnd); long gracePeriodMs(); //New methods. static SessionWindows with(final Duration inactivityGap); Duration inactivityGapDuration()throws IllegalArgumentException; SessionWindows grace(final Duration afterWindowEnd); Duration gracePeriod(); } TimeWindowedDeserializer { //Existing methods. Will be deprecated. TimeWindowedDeserializer(final Deserializer<T> inner, final long windowSizeMs); Long getWindowSize(); //New methods. TimeWindowedDeserializer(final Deserializer<T> inner, final Duration windowSize); Duration windowSize(); throws IllegalArgumentException; } TimeWindows { //Existing methods. Will be deprecated. static TimeWindows of(final long sizeMs) throws IllegalArgumentException; TimeWindows advanceBy(final long advanceMs); Map<Long, TimeWindow> windowsFor(final long timestamp); long size(); //Existing method. Will be removed. TimeWindows grace(final long millisAfterWindowEnd); //New methods. static TimeWindows of(final Duration size) throws IllegalArgumentException; TimeWindows advanceBy(final Duration advance); Map<Instant, TimeWindow> windowsFor(final Instant timestamp); Duration windowSize(); TimeWindows grace(final Duration afterWindowEnd); } UnlimitedWindows { //Existing methods. Will be deprecated. UnlimitedWindows startOn(final long startMs) throws IllegalArgumentException; Map<Long, UnlimitedWindow> windowsFor(final long timestamp); long size(); //Existing method. Will be removed. UnlimitedWindows grace(final long millisAfterWindowEnd); //New methods. UnlimitedWindows startOn(final Instant start) throws IllegalArgumentException; Map<Instant, UnlimitedWindow> windowsFor(final Instant timestamp); Duration windowSize(); UnlimitedWindows grace(final Duration afterWindowEnd); } ProcessorContext { //Existing method. Will be deprecated. Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback); //New method. Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException; } ReadOnlyWindowStore<K, V> { //ExistingDeprecated methods. Will be deprecated. WindowStoreIterator<V> fetch(K key, long timeFromMstimeFrom, long timeToMstimeTo); KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFromMstimeFrom, long timeToMstimeTo); KeyValueIterator<Windowed<K>, V> fetchAll(long timeFromMstimeFrom, long timeToMstimeTo); //New methods. //This changed after initial KIP voting based on [PR discussion](https://github.com/apache/kafka/pull/5682#discussion_r222494244) WindowStoreIterator<V> fetch(K key, Instant from, DurationInstant durationto) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant from, DurationInstant durationto) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, V> fetchAll(Instant from, DurationInstant durationto) throws IllegalArgumentException; } SessionBytesStoreSupplierWindowStore { //ExistingNew methods. Willwith bedefault deprecated. implementation that checks arguments long segmentIntervalMs(); long retentionPeriod(); //Newand pass it to existing fetch methods. Duration segmentInterval(); Duration retentionPeriodDuration(); } SessionStore { //Existing methods. Will be deprecated. KeyValueIterator<Windowed<K>, AGG> findSessions(final WindowStoreIterator<V> fetch(K key, long earliestSessionEndTimeMstimeFrom, final long latestSessionStartTimeMstimeTo) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, AGG>V> findSessionsfetch(final K keyFromfrom, final K keyToto, long earliestSessionEndTimeMstimeFrom, final long latestSessionStartTimeMstimeTo) throws IllegalArgumentException; //New methods. KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, Instant earliestSessionEnd, final Instant latestSessionStart); KeyValueIterator<Windowed<K>, AGG>V> findSessionsfetchAll(finallong K keyFromtimeFrom, finallong KtimeTo) keyTo, Instant earliestSessionEnd, final Instant latestSessionStart)throws IllegalArgumentException; } Stores { //Existing methods. Will be deprecated. static WindowBytesStoreSupplier persistentWindowStore(final String name, final long retentionPeriodMs, final long windowSizeMs, final boolean retainDuplicates); static//This WindowBytesStoreSuppliermethod persistentWindowStore(finaladded Stringafter name, KIP voting. Based on John final long retentionPeriodMs, final long windowSizeMs, Roesler comment(https://github.com/apache/kafka/pull/5682#discussion_r221472187) static SessionBytesStoreSupplier persistentSessionStore(final String name, final boolean retainDuplicates, final long segmentIntervalMsretentionPeriod); static SessionBytesStoreSupplierWindowBytesStoreSupplier persistentSessionStorepersistentWindowStore(final String name, final long retentionPeriodMs); //New methods. static WindowBytesStoreSupplier persistentWindowStore(final String name, , final Durationlong retentionPeriodwindowSizeMs, final Durationboolean windowSizeretainDuplicates, final booleanlong retainDuplicatessegmentIntervalMs); //New methods. static WindowBytesStoreSupplier persistentWindowStore(final String name, final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates, ) final Duration segmentInterval)throws IllegalArgumentException; static SessionBytesStoreSupplier persistentSessionStore(final String name, final Duration retentionPeriod); } WindowBytesStoreSupplier { //Existing methods. Will be deprecated. long segmentIntervalMs(); long windowSize(); long retentionPeriod(); //New methods. Duration segmentInterval(); Duration windowSizeDuration(); Duration retentionPeriodDuration(); throws IllegalArgumentException; } KafkaStreams { //Existing method. Will be deprecated. public synchronized boolean close(final long timeout, final TimeUnit timeUnit); //New method public synchronized boolean close(final Duration timeout) throws IllegalArgumentException; } |
Proposed Changes
New methods in public API are proposed. See "Public Interfaces" section.
For the methods that used both: internally and as a part of public API the proposal is:
- In this scope keep existing methods as is.
Try to reduce the visibility of methods in next tickets. - Introduce finer methods with
Instant
andDuration
Changes in KafkaStream#close
semantics:
- reject negative numbers
- make 0 just signal and return immediately (after checking the state once)
Default implementation of fetch methods in WindowStore.
Compatibility, Deprecation, and Migration Plan
...