You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current stateDiscussion

Discussion thread: here

JIRA: here

Motivation

Right now Streams API unversally represents time as ms-since-unix-epoch.

There's nothing wrong, per se, with this, but Duration is more ergonomic for an API.

What we don't want is to present a heterogeneous API, so we need to make sure the whole Streams API is in terms of Duration.

Public Interfaces

We need to add following method to public API:

Public API changes
Windows {
	//Existing methods.
	public Windows<W> until(final long durationMs) throws IllegalArgumentException;
	public long maintainMs();
	public abstract long size();

	//New methods.
	public Windows<W> until(final Duration duration) throws IllegalArgumentException;
	public Duration maintain();
	public abstract Duration size();
}

class Window {
	//Existing methods.
	public Window(final long startMs, final long endMs) throws IllegalArgumentException;
	public long start();
	public long end();

	//New methods.
	public Window(final LocalDateTime start, final LocalDateTime end) throws IllegalArgumentException;
	public LocalDateTime start();
	public LocalDateTime end();
}

JoinWindows {
    //Existing methods.
    public static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException;
    public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException;
    public JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException;
    public JoinWindows until(final long durationMs) throws IllegalArgumentException;
    public long maintainMs();

    //New methods.
    public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException;
    public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException;
    public JoinWindows after(final Duration timeDifference) throws IllegalArgumentException;
    public JoinWindows until(final Duration duration) throws IllegalArgumentException;
    public Duration maintain();
}

SessionWindows {
	//Existing methods.
    public static SessionWindows with(final long inactivityGapMs);
    public SessionWindows until(final long durationMs) throws IllegalArgumentException;
    public long inactivityGap();
    public long maintainMs();

	//New methods.
    public static SessionWindows with(final Duration inactivityGap);
    public SessionWindows until(final Duration duration) throws IllegalArgumentException;
    public Duration inactivityGap();
    public Duration maintain();
}

TimeWindowedDeserializer {
	//Existing methods.
	public TimeWindowedDeserializer(final Deserializer<T> inner, final long windowSize);
	public Long getWindowSize();

	//New methods.
	public TimeWindowedDeserializer(final Deserializer<T> inner, final Duration windowSize);
	public Duration getWindowSize();
}

TimeWindows {
	//Existing methods.
	public static TimeWindows of(final long sizeMs) throws IllegalArgumentException;
	public TimeWindows advanceBy(final long advanceMs);
	public Map<Long, TimeWindow> windowsFor(final long timestamp);
    public long size();
    public TimeWindows until(final long durationMs) throws IllegalArgumentException;
    public long maintainMs();

	//New methods.
	public static TimeWindows of(final Duration size) throws IllegalArgumentException;
	public TimeWindows advanceBy(final Duration advance);
	public Map<Long, TimeWindow> windowsFor(LocalDateTime timestamp);
    public Duration size();
    public TimeWindows until(final Duration duration) throws IllegalArgumentException;
    public Duration maintain();
}

UnlimitedWindows {
	//Existing methods.
    public UnlimitedWindows startOn(final long startMs) throws IllegalArgumentException;
    public Map<Long, UnlimitedWindow> windowsFor(final long timestamp);
	public long size();
	public long maintainMs();

	//New methods.
    public UnlimitedWindows startOn(final LocalDateTime start) throws IllegalArgumentException;
    public Map<Long, UnlimitedWindow> windowsFor(final LocalDateTime timestamp);
	public Duration size();
	public Duration maintain();
}

ProcessorContext {
    //Existing method.
    Cancellable schedule(final long intervalMs,
                         final PunctuationType type,
                         final Punctuator callback);

	//New method.
    Cancellable schedule(final Duration interval,
                         final PunctuationType type,
                         final Punctuator callback);
}

ReadOnlyWindowStore<K, V> {
    //Existing methods.
    WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
    KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);

	//New methods.
    WindowStoreIterator<V> fetch(K key, LocalDateTime from, Duration duration);
    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, LocalDateTime from, Duration duration);
    KeyValueIterator<Windowed<K>, V> fetchAll(LocalDateTime from, Duration duration);
}

SessionBytesStoreSupplier {
    //Existing methods.
    long segmentIntervalMs();
    long retentionPeriod();

    //New methods.
    Duration segmentInterval();
    Duration retentionPeriod();
}

SessionStore {
	//Existing methods.
	KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, long earliestSessionEndTime, final long latestSessionStartTime);
	KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, long earliestSessionEndTime, final long latestSessionStartTime);

	//New methods.
	KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, LocalDateTime earliestSessionEnd, final LocalDateTime latestSessionStart);
	KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, LocalDateTime earliestSessionEndTime, final LocalDateTime latestSessionStartTime);
}

Stores {
	//Existing methods.
	public static WindowBytesStoreSupplier persistentWindowStore(final String name,
    	                                                         final long retentionPeriod,
        	                                                     final long windowSize,
            	                                                 final boolean retainDuplicates);

	public static WindowBytesStoreSupplier persistentWindowStore(final String name,
    	                                                         final long retentionPeriod,
        	                                                     final long windowSize,
            	                                                 final boolean retainDuplicates,
                	                                             final long segmentInterval);

	public static SessionBytesStoreSupplier persistentSessionStore(final String name,
    	                                                           final long retentionPeriod);

	//New methods.
	public static WindowBytesStoreSupplier persistentWindowStore(final String name,
    	                                                         final Duration retentionPeriod,
        	                                                     final long windowSize,
            	                                                 final boolean retainDuplicates);

	public static WindowBytesStoreSupplier persistentWindowStore(final String name,
    	                                                         final Duration retentionPeriod,
        	                                                     final long windowSize,
            	                                                 final boolean retainDuplicates,
                	                                             final long segmentInterval);

	public static SessionBytesStoreSupplier persistentSessionStore(final String name,
    	                                                           final Duration retentionPeriod);
}

WindowBytesStoreSupplier {
    //Existing methods.
    long segmentIntervalMs();
    long windowSize();
    long retentionPeriod();

	//New methods.
    Duration segmentInterval();
    Duration windowSize();
    Duration retentionPeriod();
}

KafkaStreams {
	//Existing method.
	public synchronized boolean close(final long timeout, final TimeUnit timeUnit);

	//New method
	public synchronized boolean close(final Duration timeout)
}

StreamsMetrics {
	//Existing method.
	void recordLatency(final Sensor sensor,
    	               final long startNs,
        	           final long endNs);

	//New method.
	void recordLatency(final Sensor sensor,
    	               final LocalDateTime start,
        	           final Duration recDuration);
}


Proposed Changes

New methods in public API are proposed. See "Public Interfaces" section.

Compatibility, Deprecation, and Migration Plan

No compatibility issues foreseen.

Rejected Alternatives

An alternative solution with long parameters is implemented right now.

  • No labels