Table of Contents |
---|
Status
Current state: Under DiscussionAccepted
Discussion thread: here
JIRA:
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Motivation
- KAFKA-7277:
Right now Streams API
...
universally represents time as ms-since-unix-epoch.
There's nothing wrong, per se, with this, but Duration is more ergonomic for an API.
What we don't want is to present a heterogeneous API, so we need to make sure the whole Streams API is in terms of Duration.- KAFKA-7477:
Improve of semantics ofKafkaStreams#close(Duration)
Public Interfaces
We need to add following method to public API.
...
Method that proposed for replacement added in
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
Windows { //Existing methods. Will be deprecated. public abstract long size(); //Existing methods. Will be removed. Windows<W> grace(final long millisAfterWindowEnd); long gracePeriodMs(); //New methods. abstract Duration size(); Windows<W> grace(final Duration afterWindowEnd); Duration gracePeriod(); } class Window { //Existing methods. Will be deprecated. public Window(final long startMs, final long endMs) throws IllegalArgumentException; public long start(); public long end(); //New methods. public Window(final Instant start, final Instant end) throws IllegalArgumentException; public LocalDateTime startInstant startTime(); publicInstant LocalDateTime endendTime(); } JoinWindows { //Existing methods. Will be deprecated. public static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException; public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException; public JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException; public JoinWindows until(final long durationMs) throws IllegalArgumentException; //Existing method. Will be removed. JoinWindows grace(final long millisAfterWindowEnd); //New methods. static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException; JoinWindows before(final Duration timeDifference) throws IllegalArgumentException; JoinWindows after(final Duration timeDifference) throws IllegalArgumentException; JoinWindows untilgrace(final Duration durationafterWindowEnd) throws IllegalArgumentException; JoinWindows grace(final Duration afterWindowEnd); } Materialized { //Existing method. Will be removed. Materialized<K, V, S> withRetention(final long retentionMs); //New method. Materialized<K, V, S> withRetention(final Duration retention) throws IllegalArgumentException; } SessionWindows { //Existing methods. Will be deprecated. public static SessionWindows with(final long inactivityGapMs); public long inactivityGap(); //Existing methods. Will be removed. SessionWindows grace(final long millisAfterWindowEnd); long gracePeriodMs(); //New methods. static SessionWindows with(final Duration inactivityGap); Duration inactivityGap()throws IllegalArgumentException; SessionWindows grace(final Duration afterWindowEnd); Duration gracePeriod(); } TimeWindowedDeserializer { //Existing methods. Will be deprecated. public TimeWindowedDeserializer(final Deserializer<T> inner, final long windowSizeMs); public Long getWindowSize(); //New methods. public TimeWindowedDeserializer(final Deserializer<T> inner, final Duration windowSize); public Duration getWindowSize(); throws IllegalArgumentException; } TimeWindows { //Existing methods. Will be deprecated. public static TimeWindows of(final long sizeMs) throws IllegalArgumentException; public TimeWindows advanceBy(final long advanceMs); public Map<Long, TimeWindow> windowsFor(final long timestamp); public long size(); //Existing method. Will be removed. TimeWindows grace(final long millisAfterWindowEnd); //New methods. static TimeWindows of(final Duration size) throws IllegalArgumentException; TimeWindows advanceBy(final Duration advance); Map<Long, TimeWindow> windowsFor(final Instant timestamp); Duration size(); TimeWindows grace(final Duration afterWindowEnd); } UnlimitedWindows { //Existing methods. Will be deprecated. public UnlimitedWindows startOn(final long startMs) throws IllegalArgumentException; public Map<Long, UnlimitedWindow> windowsFor(final long timestamp); public long size(); //Existing method. Will be removed. UnlimitedWindows grace(final long millisAfterWindowEnd); //New methods. UnlimitedWindows startOn(final Instant start) throws IllegalArgumentException; Map<Long, UnlimitedWindow> windowsFor(final Instant timestamp); Duration size(); UnlimitedWindows grace(final Duration afterWindowEnd); } ProcessorContext { //Existing method. Will be deprecated. Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback); //New method. Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException; } ReadOnlyWindowStore<K, V> { //ExistingDeprecated methods. Will be deprecated. WindowStoreIterator<V> fetch(K key, long timeFromMstimeFrom, long timeToMstimeTo); KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFromMstimeFrom, long timeToMstimeTo); KeyValueIterator<Windowed<K>, V> fetchAll(long timeFromMstimeFrom, long timeToMstimeTo); //New methods. //This changed after initial KIP voting based on [PR discussion](https://github.com/apache/kafka/pull/5682#discussion_r222494244) WindowStoreIterator<V> fetch(K key, Instant from, DurationInstant durationto) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant from, DurationInstant durationto) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, V> fetchAll(Instant from, DurationInstant durationto) throws IllegalArgumentException; } SessionBytesStoreSupplierWindowStore { //ExistingNew methods. Willwith bedefault deprecated. implementation that checks arguments long segmentIntervalMs(); long retentionPeriod(); //Newand pass it to existing fetch methods. DurationWindowStoreIterator<V> segmentInterval(); Duration retentionPeriod(); } SessionStore { //Existing methods. Will be deprecated. KeyValueIterator<Windowed<K>, AGG> findSessions(final fetch(K key, long earliestSessionEndTimeMstimeFrom, final long latestSessionStartTimeMstimeTo) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, AGG>V> findSessionsfetch(final K keyFromfrom, final K keyToto, long earliestSessionEndTimeMstimeFrom, final long latestSessionStartTimeMstimeTo); //New methods. KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, Instant earliestSessionEnd, final Instant latestSessionStart); KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, Instant earliestSessionEnd, final Instant latestSessionStart) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo) throws IllegalArgumentException; } Stores { //Existing methods. Will be deprecated. public static WindowBytesStoreSupplier persistentWindowStore(final String name, final long retentionPeriodMs, final long windowSizeMs, final boolean retainDuplicates); public static WindowBytesStoreSupplier persistentWindowStore(final String name, final long retentionPeriodMs, final long windowSizeMs, //This method added after KIP voting. Based on John Roesler comment(https://github.com/apache/kafka/pull/5682#discussion_r221472187) static SessionBytesStoreSupplier persistentSessionStore(final String name, final boolean retainDuplicates, final long segmentIntervalMsretentionPeriod); public static SessionBytesStoreSupplierWindowBytesStoreSupplier persistentSessionStorepersistentWindowStore(final String name, final long retentionPeriodMs); //New methods. public static WindowBytesStoreSupplier persistentWindowStore(final String name, final Durationlong retentionPeriodwindowSizeMs, final Durationboolean windowSizeretainDuplicates, final booleanlong retainDuplicatessegmentIntervalMs); public //New methods. static WindowBytesStoreSupplier persistentWindowStore(final String name, final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates, ) final Duration segmentInterval)throws IllegalArgumentException; public static SessionBytesStoreSupplier persistentSessionStore(final String name, final Duration retentionPeriod); } WindowBytesStoreSupplier { //Existing methods. Will be deprecated. long segmentIntervalMs(); long windowSize(); long retentionPeriod(); //New methods. Duration segmentInterval(); Duration windowSize(); Duration retentionPeriod(); throws IllegalArgumentException; } KafkaStreams { //Existing method. Will be deprecated. public synchronized boolean close(final long timeout, final TimeUnit timeUnit); //New method public synchronized boolean close(final Duration timeout) } StreamsMetrics { //Existing method. Will be deprecated. void recordLatency(final Sensor sensor, final long startNs, final long endNs); //New method. void recordLatency(final Sensor sensor, final Instant start, final Duration recDuration); } throws IllegalArgumentException; } |
Proposed Changes
New methods in public API are proposed. See "Public Interfaces" section.
For the methods that used both: internally and as a part of public API the proposal is:
- In this scope keep existing methods as is.
Try to reduce the visibility of methods in next tickets. - Introduce finer methods with
Instant
andDuration
Changes in KafkaStream#close
semantics:
- reject negative numbers
- make 0 just signal and return immediately (after checking the state once)
Default implementation of fetch methods in WindowStore.
Compatibility, Deprecation, and Migration Plan
...