THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final SessionWindows sessionWindows, final String storeName); <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final Aggregator<K,Aggregator<? super K, ? super V, T> aggregator, final Merger<KMerger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final String storeName); KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String storeName); |
...
SessionWindows
Code Block |
---|
public class SessionWindows { /** * Create a new SessionWindows with the specified inactivityGap * * @param inactivityGap * @return */ public static SessionWindows with(final long inactivityGap) /** * Set the window maintain duration in milliseconds of streams time. * This retention time is a guaranteed <i>lower bound</i> for how long * a window will be maintained. * * @return itself */ public SessionWindows until(long durationMs) /** * @return the inactivityGap */ public long inactivityGap() /** * @return the minimum amount of time a window will be maintained for. */ public long maintainMs(); } |
Merger
Code Block | ||
---|---|---|
| ||
/** * The interface for merging aggregate values with the given key * * @param <K> key type * @param <T> aggregate value type */ public interface Merger<K, T> { /** * Compute a new aggregate from the key and two aggregates * * @param aggKey the key of the record * @param aggOne the first aggregate * @param aggTwo the second aggregate * @return the new aggregate value */ T apply(K aggKey, T aggOne, T aggTwo); } |
SessionStore
Code Block | ||
---|---|---|
| ||
/** * Interface for storing the aggregated values of sessions * @param <K> type of the record keys * @param <AGG> type of the aggregated values */ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K, AGG> { /** * Find any aggregated session values with the matching key and where the * session’s end time is >= earliestSessionEndTime, i.e, the oldest session to * merge with, and the session’s start time is <= latestSessionStartTime, i.e, * the newest session to merge with. */ KeyValueIterator<Windowed<K>, AGG> findSessionsToMerge(final K key, final long earliestSessionEndTime, final long latestSessionStartTime); /** * Remove the aggregated value for the session with the matching session key */ void remove(final Windowed<K> sessionKey); /** * Write the aggregated result for the given session key */ void put(final Windowed<K> sessionKey, AGG aggregate); } |
ReadOnlySessionStore
This is primarily provided for InteractiveQueries
Code Block |
---|
/** * A session store that only supports read operations. * Implementations should be thread-safe as concurrent reads and writes * are expected. * * @param <K> the key type * @param <V> the value type */ @InterfaceStability.Unstable public interface ReadOnlySessionStore<K, AGG> { /** * Retrieve all aggregated sessions for the provided key * @param key record key to find aggregated session values for * @return KeyValueIterator containing all session aggregates for the provided key. */ KeyValueIterator<Windowed<K>, AGG> fetch(final K key); } |
QueryableStoreTypes
Additional Method
Code Block | ||
---|---|---|
| ||
/** * A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore} * @param <K> key type of the store * @param <V> value type of the store * @return {@link SessionStoreType} */ public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, V>> sessionStore() |
KGroupedStream
Additional methods
Code Block | ||
---|---|---|
| ||
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final SessionWindows sessionWindows, final String storeName); KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final SessionWindows sessionWindows, final StateStoreSupplier<SessionStore> storeSupplier); <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final Aggregator<K,Aggregator<? super K, ? super V, T> aggregator, final Merger<? super Merger<KK, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final String storeName); <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final Aggregator<K, Aggregator<? super K, ? super V, T> aggregator, final Merger<KMerger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final StateStoreSupplier<SessionStore> storeSupplier); KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String storeName); KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final StateStoreSupplier<SessionStore> storeSupplier); |
...