Table of Contents |
---|
Status
Current state: VotingAccepted
Discussion thread: here
JIRA:
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Motivation
- KAFKA-7277:
Right now Streams API universally represents time as ms-since-unix-epoch.
There's nothing wrong, per se, with this, but Duration is more ergonomic for an API.
What we don't want is to present a heterogeneous API, so we need to make sure the whole Streams API is in terms of Duration. - KAFKA-7477:
Improve of semantics ofKafkaStreams#close(Duration)
Public Interfaces
We need to add following method to public API.
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
class Window { //New methods. Instant startTime(); Instant endTime(); } JoinWindows { //Existing methods. Will be deprecated. static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException; JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException; JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException; //Existing method. Will be removed. JoinWindows grace(final long millisAfterWindowEnd); //New methods. static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException; JoinWindows before(final Duration timeDifference) throws IllegalArgumentException; JoinWindows after(final Duration timeDifference) throws IllegalArgumentException; JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException; } Materialized { //Existing method. Will be removed. Materialized<K, V, S> withRetention(final long retentionMs); //New method. Materialized<K, V, S> withRetention(final Duration retention) throws IllegalArgumentException; } SessionWindows { //Existing methods. Will be deprecated. static SessionWindows with(final long inactivityGapMs); //Existing methods. Will be removed. SessionWindows grace(final long millisAfterWindowEnd); //New methods. static SessionWindows with(final Duration inactivityGap) throws IllegalArgumentException; SessionWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException; } TimeWindows { //Existing methods. Will be deprecated. static TimeWindows of(final long sizeMs) throws IllegalArgumentException; TimeWindows advanceBy(final long advanceMs); //Existing method. Will be removed. TimeWindows grace(final long millisAfterWindowEnd); //New methods. static TimeWindows of(final Duration size) throws IllegalArgumentException; TimeWindows advanceBy(final Duration advance); TimeWindows grace(final Duration afterWindowEnd); } UnlimitedWindows { //Existing methods. Will be deprecated. UnlimitedWindows startOn(final long startMs) throws IllegalArgumentException; //New methods. UnlimitedWindows startOn(final Instant start) throws IllegalArgumentException; } ProcessorContext { //Existing method. Will be deprecated. Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback); //New method. Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException; } ReadOnlyWindowStore<K, V> { //Deprecated methods. WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo); KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo); KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo); //New methods. //This changed after initial KIP voting based on [PR discussion](https://github.com/apache/kafka/pull/5682#discussion_r222494244) WindowStoreIterator<V> fetch(K key, Instant from, DurationInstant durationto) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant from, DurationInstant durationto) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, V> fetchAll(Instant from, Instant to) throws IllegalArgumentException; } WindowStore { //New methods with default implementation that checks arguments and pass it to existing fetch methods. WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, Duration duration long timeTo) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo) throws IllegalArgumentException; } Stores { //Existing methods. Will be deprecated. static WindowBytesStoreSupplier persistentWindowStore(final String name, final long retentionPeriodMs, final long windowSizeMs, final boolean retainDuplicates); //This method added after KIP voting. Based on John Roesler comment(https://github.com/apache/kafka/pull/5682#discussion_r221472187) static SessionBytesStoreSupplier persistentSessionStore(final String name, final long retentionPeriod); static WindowBytesStoreSupplier persistentWindowStore(final String name, final long retentionPeriodMs, final long windowSizeMs, final boolean retainDuplicates, final long segmentIntervalMs); //New methods. static WindowBytesStoreSupplier persistentWindowStore(final String name, final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates) throws IllegalArgumentException; static SessionBytesStoreSupplier persistentSessionStore(final String name, final Duration retentionPeriod) throws IllegalArgumentException; } KafkaStreams { //Existing method. Will be deprecated. public synchronized boolean close(final long timeout, final TimeUnit timeUnit); //New method public synchronized boolean close(final Duration timeout) throws IllegalArgumentException; } |
...
- In this scope keep existing methods as is.
Try to reduce the visibility of methods in next tickets. - Introduce finer methods with
Instant
andDuration
Changes in KafkaStream#close
semantics:
- reject negative numbers
- make 0 just signal and return immediately (after checking the state once)
Default implementation of fetch methods in WindowStore.
Compatibility, Deprecation, and Migration Plan
...