You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 17 Next »

Status

Current stateUnder Discussion

Discussion thread: here
JIRA: Unable to render Jira issues macro, execution error.

Motivation

Right now Streams API unversally represents time as ms-since-unix-epoch.

There's nothing wrong, per se, with this, but Duration is more ergonomic for an API.

What we don't want is to present a heterogeneous API, so we need to make sure the whole Streams API is in terms of Duration.

Public Interfaces

We need to add following method to public API.

Some of the old methods become deprecated. Other can be replaced, because they are not released, yet.

Method that proposed for replacement added in  Unable to render Jira issues macro, execution error. (KIP-328commit cc8fc7c ):

Public API changes
Windows {
	//Existing method. Will be deprecated.
	abstract long size();

	//Existing methods. Will be removed.
	Windows<W> grace(final long millisAfterWindowEnd);
	long gracePeriodMs();

	//New methods.
	abstract Duration size();
	Windows<W> grace(final Duration afterWindowEnd);
	Duration gracePeriod();
}

class Window {
	//Existing methods. Will be deprecated.
	Window(final long startMs, final long endMs) throws IllegalArgumentException;
	long start();
	long end();

	//New methods.
	Window(final Instant start, final Instant end) throws IllegalArgumentException;
	Instant start();
	Instant end();
}

JoinWindows {
    //Existing methods. Will be deprecated.
    static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException;
    JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException;
    JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException;
    JoinWindows until(final long durationMs) throws IllegalArgumentException;

	//Existing method. Will be removed.
	JoinWindows grace(final long millisAfterWindowEnd);

    //New methods.
    static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException;
    JoinWindows before(final Duration timeDifference) throws IllegalArgumentException;
    JoinWindows after(final Duration timeDifference) throws IllegalArgumentException;
    JoinWindows until(final Duration duration) throws IllegalArgumentException;	
	JoinWindows grace(final Duration afterWindowEnd);
}

Materialized {
	//Existing method. Will be removed.
	Materialized<K, V, S> withRetention(final long retentionMs);

	//New method.
	Materialized<K, V, S> withRetention(final Duration retention);
}


SessionWindows {
	//Existing methods. Will be deprecated.
    static SessionWindows with(final long inactivityGapMs);
    long inactivityGap();

	//Existing methods. Will be removed.
	SessionWindows grace(final long millisAfterWindowEnd);
	long gracePeriodMs();

	//New methods.
    static SessionWindows with(final Duration inactivityGap);
    Duration inactivityGap();
	SessionWindows grace(final Duration afterWindowEnd);
	Duration gracePeriod();
}

TimeWindowedDeserializer {
	//Existing methods. Will be deprecated.
	TimeWindowedDeserializer(final Deserializer<T> inner, final long windowSizeMs);
	Long getWindowSize();

	//New methods.
	TimeWindowedDeserializer(final Deserializer<T> inner, final Duration windowSize);
	Duration getWindowSize();
}

TimeWindows {
	//Existing methods. Will be deprecated.
	static TimeWindows of(final long sizeMs) throws IllegalArgumentException;
	TimeWindows advanceBy(final long advanceMs);
	Map<Long, TimeWindow> windowsFor(final long timestamp);
    long size();

	//Existing method. Will be removed.
	TimeWindows grace(final long millisAfterWindowEnd);

	//New methods.
	static TimeWindows of(final Duration size) throws IllegalArgumentException;
	TimeWindows advanceBy(final Duration advance);
	Map<Long, TimeWindow> windowsFor(final Instant timestamp);
    Duration size();
	TimeWindows grace(final Duration afterWindowEnd);
}

UnlimitedWindows {
	//Existing methods. Will be deprecated.
    UnlimitedWindows startOn(final long startMs) throws IllegalArgumentException;
    Map<Long, UnlimitedWindow> windowsFor(final long timestamp);
	long size();

	//Existing method. Will be removed.
	UnlimitedWindows grace(final long millisAfterWindowEnd);


	//New methods.
    UnlimitedWindows startOn(final Instant start) throws IllegalArgumentException;
    Map<Long, UnlimitedWindow> windowsFor(final Instant timestamp);
	Duration size();
	UnlimitedWindows grace(final Duration afterWindowEnd);
}

ProcessorContext {
    //Existing method. Will be deprecated.
    Cancellable schedule(final long intervalMs,
                         final PunctuationType type,
                         final Punctuator callback);

	//New method.
    Cancellable schedule(final Duration interval,
                         final PunctuationType type,
                         final Punctuator callback);
}

ReadOnlyWindowStore<K, V> {
    //Existing methods. Will be deprecated.
    WindowStoreIterator<V> fetch(K key, long timeFromMs, long timeToMs);
    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFromMs, long timeToMs);
    KeyValueIterator<Windowed<K>, V> fetchAll(long timeFromMs, long timeToMs);

	//New methods.
    WindowStoreIterator<V> fetch(K key, Instant from, Duration duration);
    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant from, Duration duration);
    KeyValueIterator<Windowed<K>, V> fetchAll(Instant from, Duration duration);
}

SessionBytesStoreSupplier {
    //Existing methods. Will be deprecated.
    long segmentIntervalMs();
    long retentionPeriod();

    //New methods. 
    Duration segmentInterval();
    Duration retentionPeriod();
}

SessionStore {
	//Existing methods. Will be deprecated.
	KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, long earliestSessionEndTimeMs, final long latestSessionStartTimeMs);
	KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, long earliestSessionEndTimeMs, final long latestSessionStartTimeMs);

	//New methods.
	KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, Instant earliestSessionEnd, final Instant latestSessionStart);
	KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, Instant earliestSessionEnd, final Instant latestSessionStart);
}

Stores {
	//Existing methods. Will be deprecated.
	static WindowBytesStoreSupplier persistentWindowStore(final String name,
    	                                                  final long retentionPeriodMs,
        	                                              final long windowSizeMs,
                                                          final boolean retainDuplicates);

	static WindowBytesStoreSupplier persistentWindowStore(final String name,
                                                          final long retentionPeriodMs,
                                                          final long windowSizeMs,
                                                          final boolean retainDuplicates,
                                                          final long segmentIntervalMs);

	static SessionBytesStoreSupplier persistentSessionStore(final String name,
                                                            final long retentionPeriodMs);

	//New methods.
	static WindowBytesStoreSupplier persistentWindowStore(final String name,
    	                                                  final Duration retentionPeriod,
        	                                              final Duration windowSize,
                                                          final boolean retainDuplicates);

	static WindowBytesStoreSupplier persistentWindowStore(final String name,
                                                          final Duration retentionPeriod,
                                                          final Duration windowSize,
                                                          final boolean retainDuplicates,
                                                          final Duration segmentInterval);

	static SessionBytesStoreSupplier persistentSessionStore(final String name,
                                                            final Duration retentionPeriod);
}

WindowBytesStoreSupplier {
    //Existing methods. Will be deprecated.
    long segmentIntervalMs();
    long windowSize();
    long retentionPeriod();

	//New methods.
    Duration segmentInterval();
    Duration windowSize();
    Duration retentionPeriod();
}

KafkaStreams {
	//Existing method. Will be deprecated.
	public synchronized boolean close(final long timeout, final TimeUnit timeUnit);

	//New method
	public synchronized boolean close(final Duration timeout)
}


Proposed Changes

New methods in public API are proposed. See "Public Interfaces" section.

Compatibility, Deprecation, and Migration Plan

No compatibility issues foreseen.

Rejected Alternatives

An alternative solution with long parameters is implemented right now.

  • No labels