Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion thread: here
JIRA:

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7277
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7477

Motivation

  1. KAFKA-7277:

    Right now Streams API

...

  1. universally represents time as ms-since-unix-epoch.
    There's nothing wrong, per se, with this, but Duration is more ergonomic for an API.
    What we don't want is to present a heterogeneous API, so we need to make sure the whole Streams API is in terms of Duration.

  2. KAFKA-7477:
    Improve of semantics of KafkaStreams#close(Duration)

Public Interfaces

We need to add following method to public API.

...

Method that proposed for replacement added in 

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7222
(KIP-328commit  cc8fc7c b3771ba):

Code Block
languagejava
firstline0
titlePublic API changes
Windows {
	//Existing method. Will be deprecated.
	abstract long size();

	//Existing methods. Will be removed.
	Windows<W> grace(final long millisAfterWindowEnd);
	long gracePeriodMs();

	//New methods.
	abstract Duration size();
	Windows<W> grace(final Duration afterWindowEnd);
	Duration gracePeriod();
}

class Window {
	//Existing methods. Will be deprecated.
	Window(final long startMs, final long endMs) throws IllegalArgumentException;
	long start();
	long end();

	//New methods.
	Window(final Instant start, final Instant end) throws IllegalArgumentException;
	Instant startInstant startTime();
	Instant endendTime();
}

JoinWindows {
    //Existing methods. Will be deprecated.
    static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException;
    JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException;
    JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException;
    JoinWindows until(final long durationMs) throws IllegalArgumentException;

	

	//Existing method. Will be removed.
	JoinWindows grace(final long millisAfterWindowEnd);

    //New methods.
    static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException;
    JoinWindows before(final Duration timeDifference) throws IllegalArgumentException;
    JoinWindows after(final Duration timeDifference) throws IllegalArgumentException;
    	JoinWindows untilgrace(final Duration durationafterWindowEnd) throws IllegalArgumentException;	
	JoinWindows grace(final Duration afterWindowEnd);
}

Materialized {
	//Existing method. Will be removed.
	Materialized<K, V, S> withRetention(final long retentionMs);

	//New method.
	Materialized<K, V, S> withRetention(final Duration retention) throws IllegalArgumentException;
}


SessionWindows {
	//Existing methods. Will be deprecated.
    static SessionWindows with(final long inactivityGapMs);
    long inactivityGap();

	//Existing methods. Will be removed.
	SessionWindows grace(final long millisAfterWindowEnd);
	long gracePeriodMs();

	//New methods.
    static SessionWindows with(final Duration inactivityGap);
    Duration inactivityGap()throws IllegalArgumentException;
	SessionWindows grace(final Duration afterWindowEnd);
	Duration gracePeriod();
}

TimeWindowedDeserializer {
	//Existing methods. Will be deprecated.
	TimeWindowedDeserializer(final Deserializer<T> inner, final long windowSizeMs);
	Long getWindowSize();

	//New methods.
	TimeWindowedDeserializer(final Deserializer<T> inner, final Duration windowSize);
	Duration getWindowSize();
 throws IllegalArgumentException;
}

TimeWindows {
	//Existing methods. Will be deprecated.
	static TimeWindows of(final long sizeMs) throws IllegalArgumentException;
	TimeWindows advanceBy(final long advanceMs);
	Map<Long, TimeWindow> windowsFor(final long timestamp);
    long size();

	//Existing method. Will be removed.
	TimeWindows grace(final long millisAfterWindowEnd);

	//New methods.
	static TimeWindows of(final Duration size) throws IllegalArgumentException;
	TimeWindows advanceBy(final Duration advance);
	Map<Long, TimeWindow> windowsFor(final Instant timestamp);
    Duration size();
	TimeWindows grace(final Duration afterWindowEnd);
}

UnlimitedWindows {
	//Existing methods. Will be deprecated.
    UnlimitedWindows startOn(final long startMs) throws IllegalArgumentException;
    Map<Long, UnlimitedWindow> windowsFor(final long timestamp);
	long size();

	//Existing method. Will be removed.
	UnlimitedWindows grace(final long millisAfterWindowEnd);


	//New methods.
    UnlimitedWindows startOn(final Instant start) throws IllegalArgumentException;
    Map<Long, UnlimitedWindow> windowsFor(final Instant timestamp);
	Duration size();
	UnlimitedWindows grace(final Duration afterWindowEnd);
}

ProcessorContext {
    //Existing method. Will be deprecated.
    Cancellable schedule(final long intervalMs,
                         final PunctuationType type,
                         final Punctuator callback);

	//New method.
    Cancellable schedule(final Duration interval,
                         final PunctuationType type,
                         final Punctuator callback) throws IllegalArgumentException;
}

ReadOnlyWindowStore<K, V> {
    //ExistingDeprecated methods. Will be deprecated.
    WindowStoreIterator<V> fetch(K key, long timeFromMstimeFrom, long timeToMstimeTo);
    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFromMstimeFrom, long timeToMstimeTo);
    KeyValueIterator<Windowed<K>, V> fetchAll(long timeFromMstimeFrom, long timeToMstimeTo);

	//New methods.
	//This changed after initial KIP voting based on [PR discussion](https://github.com/apache/kafka/pull/5682#discussion_r222494244)
    WindowStoreIterator<V> fetch(K key, Instant from, DurationInstant durationto) throws IllegalArgumentException;
    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant from, DurationInstant durationto) throws IllegalArgumentException;
    KeyValueIterator<Windowed<K>, V> fetchAll(Instant from, DurationInstant durationto) throws IllegalArgumentException;
}

SessionBytesStoreSupplierWindowStore {
    	//ExistingNew methods. Willwith bedefault deprecated.
implementation that checks arguments long segmentIntervalMs();
    long retentionPeriod();

    //New methods. and pass it to existing fetch methods.
    Duration segmentInterval();
    Duration retentionPeriod();
}

SessionStore {
	//Existing methods. Will be deprecated.
	KeyValueIterator<Windowed<K>, AGG> findSessions(final WindowStoreIterator<V> fetch(K key, long earliestSessionEndTimeMstimeFrom, final long latestSessionStartTimeMstimeTo) throws IllegalArgumentException;
	    KeyValueIterator<Windowed<K>, AGG>V> findSessionsfetch(final K keyFromfrom, final K keyToto, long earliestSessionEndTimeMstimeFrom, final long latestSessionStartTimeMstimeTo) throws IllegalArgumentException;

	//New methods.
	KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, Instant earliestSessionEnd, final Instant latestSessionStart);
	KeyValueIterator<Windowed<K>, AGG>V> findSessionsfetchAll(finallong K keyFromtimeFrom, finallong KtimeTo) keyTo, Instant earliestSessionEnd, final Instant latestSessionStart)throws IllegalArgumentException;
}

Stores {
	//Existing methods. Will be deprecated.
	static WindowBytesStoreSupplier persistentWindowStore(final String name,
    	                                                  final long retentionPeriodMs,
        	                                              final long windowSizeMs,
                                                          final boolean retainDuplicates);


	static//This WindowBytesStoreSuppliermethod persistentWindowStore(finaladded Stringafter name,
KIP voting. Based on John                                                      final long retentionPeriodMs,
                                                          final long windowSizeMs,
                                Roesler comment(https://github.com/apache/kafka/pull/5682#discussion_r221472187)
	static SessionBytesStoreSupplier persistentSessionStore(final String name,
                          final boolean retainDuplicates,
                                                          final long segmentIntervalMsretentionPeriod);

	static SessionBytesStoreSupplierWindowBytesStoreSupplier persistentSessionStorepersistentWindowStore(final String name,
                                                            final long retentionPeriodMs);

	//New methods.
	static WindowBytesStoreSupplier persistentWindowStore(final String name,
,
         	                                                  final Durationlong retentionPeriodwindowSizeMs,
          	                                                final Durationboolean windowSizeretainDuplicates,
                                                          final booleanlong retainDuplicatessegmentIntervalMs);

	//New methods.
	static WindowBytesStoreSupplier persistentWindowStore(final String name,
        	                                                  final Duration retentionPeriod,
           	                                               final Duration windowSize,
                                                          final boolean retainDuplicates,
)                                                          final Duration segmentInterval)throws IllegalArgumentException;

	static SessionBytesStoreSupplier persistentSessionStore(final String name,
                                                            final Duration retentionPeriod);
}

WindowBytesStoreSupplier {
    //Existing methods. Will be deprecated.
    long segmentIntervalMs();
    long windowSize();
    long retentionPeriod();

	//New methods.
    Duration segmentInterval();
    Duration windowSize();
    Duration retentionPeriod();
 throws IllegalArgumentException;
}

KafkaStreams {
	//Existing method. Will be deprecated.
	public synchronized boolean close(final long timeout, final TimeUnit timeUnit);

	//New method
	public synchronized boolean close(final Duration timeout) throws IllegalArgumentException;
}


Proposed Changes

New methods in public API are proposed. See "Public Interfaces" section.

For the methods that used both: internally and as a part of public API the proposal is:

  1. In this scope keep existing methods as is.
    Try to reduce the visibility of methods in next tickets.
  2. Introduce finer methods with Instant and Duration

Changes in KafkaStream#close semantics:

  1. reject negative numbers
  2. make 0 just signal and return immediately (after checking the state once)

Default implementation of fetch methods in WindowStore.

Compatibility, Deprecation, and Migration Plan

...