Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion thread: here
JIRA:

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7277
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7477

Motivation

  1. KAFKA-7277:

    Right now Streams API

...

  1. universally represents time as ms-since-unix-epoch.
    There's nothing wrong, per se, with this, but Duration is more ergonomic for an API.
    What we don't want is to present a heterogeneous API, so we need to make sure the whole Streams API is in terms of Duration.

  2. KAFKA-7477:
    Improve of semantics of KafkaStreams#close(Duration)

Public Interfaces

We need to add following method to public API.

Some of the old methods become deprecated. Other can be replaced, because they are not released, yet.

Method that proposed for replacement added in 

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7222
(KIP-328commit b3771ba):

Code Block
languagejava
firstline0
titlePublic API changes
Windows {
	//Existing methods. Will be deprecated.
	public Windows<W> until(final long durationMs) throws IllegalArgumentException;
	public long maintainMs();
	public abstract long size();

	//New methods.
	public Windows<W> until(final Duration duration) throws IllegalArgumentException;
	public Duration maintain();
	public abstract Duration size();
}

class Window {
	//Existing methods. Will be deprecated.
	public Window(final long startMs, final long endMs) throws IllegalArgumentException;
	public long start();
	public long end();

	//New methods.
	public Window(final Instant start, final Instant end) throws IllegalArgumentException;
	public LocalDateTime startInstant startTime();
	publicInstant LocalDateTime endendTime();
}

JoinWindows {
    //Existing methods. Will be deprecated.
    public static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException;
    public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException;
    public JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException;	

	//Existing method. Will  public be removed.
	JoinWindows untilgrace(final long durationMs) throws IllegalArgumentException;
    public long maintainMs(millisAfterWindowEnd);

    //New methods.
    public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException;
    public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException;
    public JoinWindows after(final Duration timeDifference) throws IllegalArgumentException;
    public 	JoinWindows untilgrace(final Duration durationafterWindowEnd) throws IllegalArgumentException;
    public Duration maintain();
}

SessionWindowsMaterialized {
	//Existing methodsmethod. Will be deprecatedremoved.
	Materialized<K, V,   public static SessionWindows withS> withRetention(final long inactivityGapMsretentionMs);

	//New method.
	Materialized<K, V, S> public SessionWindows untilwithRetention(final longDuration durationMsretention) throws IllegalArgumentException;
    public long inactivityGap();
    public long maintainMs();
}

SessionWindows {
	//NewExisting methods.
 Will   public static SessionWindows with(final Duration inactivityGap);be deprecated.
    publicstatic SessionWindows untilwith(final Duration duration) throws IllegalArgumentException;
    public Duration inactivityGap();
    public Duration maintain(long inactivityGapMs);
}

TimeWindowedDeserializer {
	//Existing methods. Will be deprecatedremoved.
	publicSessionWindows TimeWindowedDeserializer(final Deserializer<T> inner, grace(final long windowSizemillisAfterWindowEnd);
	public Long getWindowSize();

	//New methods.
	public TimeWindowedDeserializer(final Deserializer<T> inner,     static SessionWindows with(final Duration windowSizeinactivityGap) throws IllegalArgumentException;
	publicSessionWindows grace(final Duration getWindowSize()afterWindowEnd) throws IllegalArgumentException;
}

TimeWindows {
	//Existing methods. Will be deprecated.
	public static TimeWindows of(final long sizeMs) throws IllegalArgumentException;
	public TimeWindows advanceBy(final long advanceMs);

	public Map<Long, TimeWindow> windowsFor(final long timestamp);
    public long size();
    public TimeWindows until//Existing method. Will be removed.
	TimeWindows grace(final long durationMs) throws IllegalArgumentException;
    public long maintainMs(millisAfterWindowEnd);

	//New methods.
	public static TimeWindows of(final Duration size) throws IllegalArgumentException;
	public TimeWindows advanceBy(final Duration advance);
	public Map<Long, TimeWindow> windowsFor(Instant timestamp);
    public Duration size();
    public TimeWindows untilTimeWindows grace(final Duration duration) throws IllegalArgumentException;
    public Duration maintain(afterWindowEnd);
}

UnlimitedWindows {
	//Existing methods. Will be deprecated.
    public UnlimitedWindows startOn(final long startMs) throws IllegalArgumentException;
    public Map<Long, UnlimitedWindow> windowsFor(final long timestamp);
	public long size();
	public long maintainMs();

	//New methods.
    public UnlimitedWindows startOn(final Instant start) throws IllegalArgumentException;
    public Map<Long, UnlimitedWindow> windowsFor(final Instant timestamp);
	public Duration size();
	public Duration maintain();
}

ProcessorContext {
    //Existing method. Will be deprecated.
    Cancellable schedule(final long intervalMs,
                         final PunctuationType type,
                         final Punctuator callback);

	//New method.
    Cancellable schedule(final Duration interval,
                         final PunctuationType type,
                         final Punctuator callback) throws IllegalArgumentException;
}

ReadOnlyWindowStore<K, V> {
    //ExistingDeprecated methods. Will be deprecated.
    WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
    KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);

	//New methods.
	//This changed after initial KIP voting based on [PR discussion](https://github.com/apache/kafka/pull/5682#discussion_r222494244)
    WindowStoreIterator<V> fetch(K key, Instant from, DurationInstant durationto) throws IllegalArgumentException;
    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant from, DurationInstant durationto) throws IllegalArgumentException;
    KeyValueIterator<Windowed<K>, V> fetchAll(Instant from, DurationInstant durationto) throws IllegalArgumentException;
}

SessionBytesStoreSupplierWindowStore {
    	//ExistingNew methods. Willwith bedefault deprecated.
implementation that checks arguments long segmentIntervalMs();
    long retentionPeriod();

    //New methods. and pass it to existing fetch methods.
    Duration segmentInterval();
    Duration retentionPeriod();
}

SessionStore {
	//Existing methods. Will be deprecated.
	KeyValueIterator<Windowed<K>, AGG> findSessions(final WindowStoreIterator<V> fetch(K key, long earliestSessionEndTimetimeFrom, final long latestSessionStartTimetimeTo) throws IllegalArgumentException;
	    KeyValueIterator<Windowed<K>, AGG>V> findSessionsfetch(final K keyFromfrom, final K keyToto, long earliestSessionEndTimetimeFrom, final long latestSessionStartTimetimeTo);

	//New methods.
	KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, Instant earliestSessionEnd, final Instant latestSessionStart);
	KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, Instant earliestSessionEnd, final Instant latestSessionStart) throws IllegalArgumentException;
    KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo) throws IllegalArgumentException;
}

Stores {
	//Existing methods. Will be deprecated.
	public static WindowBytesStoreSupplier persistentWindowStore(final String name,
    	                                                         final long retentionPeriodretentionPeriodMs,
        	                                                     final long windowSizewindowSizeMs,
            	                                                 final boolean retainDuplicates);

	public static WindowBytesStoreSupplier persistentWindowStore(final String name,
    	                                                         final long retentionPeriod,
        	                                                     final long windowSize,
            	                                                 final boolean retainDuplicates,
	//This method added after KIP voting. Based on John Roesler comment(https://github.com/apache/kafka/pull/5682#discussion_r221472187)
	static SessionBytesStoreSupplier persistentSessionStore(final String name,
                  	                                             final long segmentIntervalretentionPeriod);

	public static SessionBytesStoreSupplierWindowBytesStoreSupplier persistentSessionStorepersistentWindowStore(final String name,
    	                                                           final long retentionPeriod);

	//New methods.
	public static WindowBytesStoreSupplier persistentWindowStore(final String name,
    	retentionPeriodMs,
                                                          final Durationlong retentionPeriodwindowSizeMs,
        	                                                     final Durationboolean windowSizeretainDuplicates,
            	                                                 final booleanlong retainDuplicatessegmentIntervalMs);

	public //New methods.
	static WindowBytesStoreSupplier persistentWindowStore(final String name,
    	                                                         final Duration retentionPeriod,
        	                                                     final Duration windowSize,
            	                                                 final boolean retainDuplicates,
)                	                                             final Duration segmentInterval)throws IllegalArgumentException;

	public static SessionBytesStoreSupplier persistentSessionStore(final String name,
    	                                                           final Duration retentionPeriod);
}

WindowBytesStoreSupplier {
    //Existing methods. Will be deprecated.
    long segmentIntervalMs();
    long windowSize();
    long retentionPeriod();

	//New methods.
    Duration segmentInterval();
    Duration windowSize();
    Duration retentionPeriod() throws IllegalArgumentException;
}

KafkaStreams {
	//Existing method. Will be deprecated.
	public synchronized boolean close(final long timeout, final TimeUnit timeUnit);

	//New method
	public synchronized boolean close(final Duration timeout)
}

StreamsMetrics {
	//Existing method. Will be deprecated.
	void recordLatency(final Sensor sensor,
    	               final long startNs,
        	           final long endNs);

	//New method.
	void recordLatency(final Sensor sensor,
    	               final Instant start,
        	           final Duration recDuration);
} throws IllegalArgumentException;
}


Proposed Changes

New methods in public API are proposed. See "Public Interfaces" section.

For the methods that used both: internally and as a part of public API the proposal is:

  1. In this scope keep existing methods as is.
    Try to reduce the visibility of methods in next tickets.
  2. Introduce finer methods with Instant and Duration

Changes in KafkaStream#close semantics:

  1. reject negative numbers
  2. make 0 just signal and return immediately (after checking the state once)

Default implementation of fetch methods in WindowStore.

Compatibility, Deprecation, and Migration Plan

...