Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateVotingAccepted

Discussion thread: here
JIRA:

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7277
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7477

Motivation

  1. KAFKA-7277:

    Right now Streams API

...

  1. universally represents time as ms-since-unix-epoch.
    There's nothing wrong, per se, with this, but Duration is more ergonomic for an API.
    What we don't want is to present a heterogeneous API, so we need to make sure the whole Streams API is in terms of Duration.

  2. KAFKA-7477:
    Improve of semantics of KafkaStreams#close(Duration)

Public Interfaces

We need to add following method to public API.

...

Code Block
languagejava
firstline0
titlePublic API changes
class Window {
	//Existing methods. Will be deprecated.
	Window(final long startMs, final long endMs) throws IllegalArgumentException;

	//New methods.
	Window(final Instant start, final Instant end) throws IllegalArgumentException;
	Instant startTime();
	Instant endTime();
}

JoinWindows {
    //Existing methods. Will be deprecated.
    static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException;
    JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException;
    JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException;	

	long size();

	//Existing method. Will be removed.
	JoinWindows grace(final long millisAfterWindowEnd);

    //New methods.
    static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException;
    JoinWindows before(final Duration timeDifference) throws IllegalArgumentException;
    JoinWindows after(final Duration timeDifference) throws IllegalArgumentException;
	JoinWindows grace(final Duration afterWindowEnd);
	Map<Instant, Window> windowsForTime(final Instant timestamp);
	Duration windowSize();
throws IllegalArgumentException;
}

Materialized {
	//Existing method. Will be removed.
	Materialized<K, V, S> withRetention(final long retentionMs);

	//New method.
	Materialized<K, V, S> withRetention(final Duration retention) throws IllegalArgumentException;
}


SessionWindows {
	//Existing methods. Will be deprecated.
    static SessionWindows with(final long inactivityGapMs);

	//Existing methods. Will be removed.
	SessionWindows grace(final long millisAfterWindowEnd);

	//New methods.
    static SessionWindows with(final Duration inactivityGap) throws IllegalArgumentException;
	SessionWindows grace(final Duration afterWindowEnd);
}

TimeWindowedDeserializer {
	//Existing methods. Will be deprecated.
	TimeWindowedDeserializer(final Deserializer<T> inner, final long windowSizeMs);

	//New methods.
	TimeWindowedDeserializer(final Deserializer<T> inner, final Duration windowSize);
	Duration windowSize();
 throws IllegalArgumentException;
}

TimeWindows {
	//Existing methods. Will be deprecated.
	static TimeWindows of(final long sizeMs) throws IllegalArgumentException;
	TimeWindows advanceBy(final long advanceMs);
    long size();

	//Existing method. Will be removed.
	TimeWindows grace(final long millisAfterWindowEnd);

	//New methods.
	static TimeWindows of(final Duration size) throws IllegalArgumentException;
	TimeWindows advanceBy(final Duration advance);
	Map<Instant, TimeWindow> windowsFor(final Instant timestamp);
    Duration windowSize();
	TimeWindows grace(final Duration afterWindowEnd);
}

UnlimitedWindows {
	//Existing methods. Will be deprecated.
    UnlimitedWindows startOn(final long startMs) throws IllegalArgumentException;
	long size();

	//Existing method. Will be removed.
	UnlimitedWindows grace(final long millisAfterWindowEnd);


	//New methods.
    UnlimitedWindows startOn(final Instant start) throws IllegalArgumentException;
    Map<Instant, UnlimitedWindow> windowsFor(final Instant timestamp);
	Duration windowSize();
	UnlimitedWindows grace(final Duration afterWindowEnd);
}

ProcessorContext {
    //Existing method. Will be deprecated.
    Cancellable schedule(final long intervalMs,
                         final PunctuationType type,
                         final Punctuator callback);

	//New method.
    Cancellable schedule(final Duration interval,
                         final PunctuationType type,
                         final Punctuator callback) throws IllegalArgumentException;
}

ReadOnlyWindowStore<K, V> {
	    //NewDeprecated methods.
    WindowStoreIterator<V> fetch(K key, Instantlong fromtimeFrom, Durationlong durationtimeTo);
    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instantlong fromtimeFrom, Durationlong durationtimeTo);
    KeyValueIterator<Windowed<K>, V> fetchAll(Instantlong fromtimeFrom, Durationlong durationtimeTo);
}

SessionBytesStoreSupplier {
    
	//ExistingNew methods. Will be deprecated.
    long segmentIntervalMs();
    long retentionPeriod();

    //New methods. 
	//This changed after initial KIP voting based on [PR discussion](https://github.com/apache/kafka/pull/5682#discussion_r222494244)
    DurationWindowStoreIterator<V> segmentIntervalfetch();
K key, Instant from, DurationInstant retentionPeriodDuration();
}

SessionStore {
	//New methods.
	to) throws IllegalArgumentException;
    KeyValueIterator<Windowed<K>, AGG>V> findSessions(finalfetch(K from, K keyto, Instant earliestSessionEndfrom, final Instant latestSessionStartto) throws IllegalArgumentException;
	KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFromKeyValueIterator<Windowed<K>, finalV> KfetchAll(Instant keyTofrom, Instant earliestSessionEnd,to) final Instant latestSessionStart)throws IllegalArgumentException;
}

StoresWindowStore {
	//ExistingNew methods. Willwith be deprecated.
	static WindowBytesStoreSupplier persistentWindowStore(final String name,
    	  default implementation that checks arguments and pass it to existing fetch methods.
    WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) throws IllegalArgumentException;
    KeyValueIterator<Windowed<K>, V> fetch(K from,                             final long retentionPeriodMs,
    K to, long timeFrom, long timeTo) throws IllegalArgumentException;
    	KeyValueIterator<Windowed<K>,                                              final long windowSizeMs,
                                                          final boolean retainDuplicates);
V> fetchAll(long timeFrom, long timeTo) throws IllegalArgumentException;
}

Stores {
	//Existing methods. Will be deprecated.
	static WindowBytesStoreSupplier persistentWindowStore(final String name,
    	                                                      final long retentionPeriodMs,
            	                                              final long windowSizeMs,
                                                          final boolean retainDuplicates,
                                                          final long segmentIntervalMs);

);


	//This method added after KIP voting. Based on John Roesler comment(https://github.com/apache/kafka/pull/5682#discussion_r221472187)
	static SessionBytesStoreSupplier persistentSessionStore(final String name,
                                                               final long retentionPeriodMsretentionPeriod);

	//New methods.
	static WindowBytesStoreSupplier persistentWindowStore(final String name,
    	                                                      final Durationlong retentionPeriodretentionPeriodMs,
        	                                                  final Durationlong windowSizewindowSizeMs,
                                                          final boolean retainDuplicates);,

	static WindowBytesStoreSupplier persistentWindowStore(final String name,
                                                      final long segmentIntervalMs);

	//New methods.
	static WindowBytesStoreSupplier persistentWindowStore(final DurationString retentionPeriodname,
    	                                                      final Duration windowSizeretentionPeriod,
        	                                                  final booleanDuration retainDuplicateswindowSize,
                                                          final Durationboolean segmentIntervalretainDuplicates) throws IllegalArgumentException;

	static SessionBytesStoreSupplier persistentSessionStore(final String name,
                                                            final Duration retentionPeriod);
}

WindowBytesStoreSupplier {
    //Existing methods. Will be deprecated.
    long segmentIntervalMs();
    long windowSize();
    long retentionPeriod();

	//New methods.
    Duration segmentInterval();
    Duration windowSizeDuration();
    Duration retentionPeriodDuration();
 throws IllegalArgumentException;
}

KafkaStreams {
	//Existing method. Will be deprecated.
	public synchronized boolean close(final long timeout, final TimeUnit timeUnit);

	//New method
	public synchronized boolean close(final Duration timeout) throws IllegalArgumentException;
}


Proposed Changes

New methods in public API are proposed. See "Public Interfaces" section.

...

  1. In this scope keep existing methods as is.
    Try to reduce the visibility of methods in next tickets.
  2. Introduce finer methods with Instant and Duration

Changes in KafkaStream#close semantics:

  1. reject negative numbers
  2. make 0 just signal and return immediately (after checking the state once)

Default implementation of fetch methods in WindowStore.

Compatibility, Deprecation, and Migration Plan

...