Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion thread: here
JIRA: here

Motivation

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7277
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7477

Motivation

  1. KAFKA-7277:

    Right now Streams API

...

  1. universally represents time as ms-since-unix-epoch.
    There's nothing wrong, per se, with this, but Duration is more ergonomic for an API.
    What we don't want is to present a heterogeneous API, so we need to make sure the whole Streams API is in terms of Duration.

  2. KAFKA-7477:
    Improve of semantics of KafkaStreams#close(Duration)

Public Interfaces

We need to add following method to public API.

Some of the old methods become deprecated. Other can be replaced, because they are not released, yet.

Method that proposed for replacement added in 

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7222
(KIP-328commit b3771ba):

Code Block
languagejava
firstline0
titlePublic API changes
public interface class Window {
	//New methods.
	Instant startTime();
	Instant endTime();
}

JoinWindows {
    //Existing methods. Will be deprecated.
    static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException;
    JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException;
    JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException;	

	//Existing method. Will be removed.
	JoinWindows grace(final long millisAfterWindowEnd);

    //New methods.
    static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException;
    JoinWindows before(final Duration timeDifference) throws IllegalArgumentException;
    JoinWindows after(final Duration timeDifference) throws IllegalArgumentException;
	JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException;
}

Materialized {
	//Existing method. Will be removed.
	Materialized<K, V, S> withRetention(final long retentionMs);

	//New method.
	Materialized<K, V, S> withRetention(final Duration retention) throws IllegalArgumentException;
}

SessionWindows {
	//Existing methods. Will be deprecated.
    static SessionWindows with(final long inactivityGapMs);

	//Existing methods. Will be removed.
	SessionWindows grace(final long millisAfterWindowEnd);

	//New methods.
    static SessionWindows with(final Duration inactivityGap) throws IllegalArgumentException;
	SessionWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException;
}

TimeWindows {
	//Existing methods. Will be deprecated.
	static TimeWindows of(final long sizeMs) throws IllegalArgumentException;
	TimeWindows advanceBy(final long advanceMs);

	//Existing method. Will be removed.
	TimeWindows grace(final long millisAfterWindowEnd);

	//New methods.
	static TimeWindows of(final Duration size) throws IllegalArgumentException;
	TimeWindows advanceBy(final Duration advance);
	TimeWindows grace(final Duration afterWindowEnd);
}

UnlimitedWindows {
	//Existing methods. Will be deprecated.
    UnlimitedWindows startOn(final long startMs) throws IllegalArgumentException;

	//New methods.
    UnlimitedWindows startOn(final Instant start) throws IllegalArgumentException;
}

ProcessorContext {
    //Existing method. Will be deprecated.
    Cancellable schedule(final long intervalMs,
                         final PunctuationType type,
                         final Punctuator callback);


	//New method.
    Cancellable schedule(final Duration interval,
                         final PunctuationType type,
                         final Punctuator callback) throws IllegalArgumentException;
}


ReadOnlyWindowStore<K, V> {
    //ExistingDeprecated methods.
    WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);

    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);

    KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);


	//New methods.
	//This changed after initial KIP voting based on [PR discussion](https://github.com/apache/kafka/pull/5682#discussion_r222494244)
    WindowStoreIterator<V> fetch(K key, LocalDateTimeInstant from, DurationInstant durationto) throws IllegalArgumentException;

    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, LocalDateTimeInstant from, DurationInstant durationto) throws IllegalArgumentException;

    KeyValueIterator<Windowed<K>, V> fetchAll(LocalDateTimeInstant from, DurationInstant durationto) throws IllegalArgumentException;
}


SessionBytesStoreSupplierWindowStore {
	//New methods with default implementation that checks arguments and pass it to existing //Existingfetch methods.
    longWindowStoreIterator<V> segmentIntervalMs();

   fetch(K key, long timeFrom, long retentionPeriod();


    //New methods.
    Duration segmentInterval();

    Duration retentionPeriod();

}


WindowBytesStoreSupplier {
    timeTo) throws IllegalArgumentException;
    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo) throws IllegalArgumentException;
    KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo) throws IllegalArgumentException;
}

Stores {
	//Existing methods.
 Will be deprecated.
	static longWindowBytesStoreSupplier segmentIntervalMs();

persistentWindowStore(final String name,
    	               long windowSize();

    long retentionPeriod();


	//New methods.
    Duration segmentInterval();

    Duration windowSize();

    Duration retentionPeriod();
}


JoinWindows {
    //Existing methods.
    public static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException;


retentionPeriodMs,
        	                        public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException;

    public JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException;


    public JoinWindows until(final long durationMs)windowSizeMs,
 throws IllegalArgumentException;


    public long maintainMs();


    //New methods.
    public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException;

    public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException;

    public JoinWindows after(final Duration timeDifference) throws IllegalArgumentException;

    public JoinWindows until(final Duration duration) throws IllegalArgumentException;

    publicfinal Durationboolean maintain(retainDuplicates);
}


SessionWindows {
	//Existing methods.
    public static SessionWindows with(final long inactivityGapMs);


    public SessionWindows until(final long durationMs) throws IllegalArgumentException;


    public long inactivityGap();


    public long maintainMs();


	//New methods.
    public static SessionWindows with(final Duration inactivityGap);

    public SessionWindows until(final Duration duration) throws IllegalArgumentException;

    public Duration inactivityGap();

    public Duration maintain();
}


TimeWindows {
	//Exising methods.
	public static TimeWindows of(final long sizeMs) throws IllegalArgumentException;


	public TimeWindows advanceBy(final long advanceMs);


	public Map<Long, TimeWindow> windowsFor(final long timestamp);


    public long size();


    public TimeWindows until(final long durationMs) throws IllegalArgumentException;


    public long maintainMs();


This method added after KIP voting. Based on John Roesler comment(https://github.com/apache/kafka/pull/5682#discussion_r221472187)
	static SessionBytesStoreSupplier persistentSessionStore(final String name,
                                                               final long retentionPeriod);

	static WindowBytesStoreSupplier persistentWindowStore(final String name,
                                                          final long retentionPeriodMs,
                                                          final long windowSizeMs,
                                                          final boolean retainDuplicates,
                                                          final long segmentIntervalMs);

	//New methods.
	public static TimeWindowsWindowBytesStoreSupplier ofpersistentWindowStore(final String name,
 Duration size) throws IllegalArgumentException;

	public TimeWindows advanceBy(	                                                  final Duration advance);

	public Map<Long, TimeWindow> windowsFor(LocalDateTime timestamp);


retentionPeriod,
        	                                              publicfinal Duration size();

windowSize,
                                                        public TimeWindows until(final Durationboolean durationretainDuplicates) throws IllegalArgumentException;

	static SessionBytesStoreSupplier persistentSessionStore(final String name,
               public                                             final Duration maintain()retentionPeriod) throws IllegalArgumentException;
}


WindowsKafkaStreams {
	//Existing methods method. Will be deprecated.
	public synchronized Windows<W>boolean untilclose(final long durationMs)timeout, throws IllegalArgumentException;


	public long maintainMs();


	public abstract long size(final TimeUnit timeUnit);


	//New methods.method
	public synchronized Windows<W>boolean untilclose(final Duration durationtimeout) throws IllegalArgumentException;

	public Duration maintain();

	public abstract Duration size();
}}


Proposed Changes

New methods in public API are proposed. See "Public Interfaces" sesionsection.

For the methods that used both: internally and as a part of public API the proposal is:

  1. In this scope keep existing methods as is.
    Try to reduce the visibility of methods in next tickets.
  2. Introduce finer methods with Instant and Duration

Changes in KafkaStream#close semantics:

  1. reject negative numbers
  2. make 0 just signal and return immediately (after checking the state once)

Default implementation of fetch methods in WindowStore.

Compatibility, Deprecation, and Migration Plan

No compatibility issues foreseen.

Rejected Alternatives

Alternative An alternative solution with long parameters are is implemented right now.