Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                             final SessionWindows sessionWindows,
                             final String storeName);


<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                   final Aggregator<K,Aggregator<? super K, ? super V, T> aggregator,
                                   final Merger<KMerger<? super K, T> sessionMerger,
                                   final SessionWindows sessionWindows,
                                   final Serde<T> aggValueSerde,
                                   final String storeName);
 
KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String storeName);

...

SessionWindows

Code Block
public class SessionWindows {

    /**
     * Create a new SessionWindows with the specified inactivityGap
     *
     * @param inactivityGap
     * @return
     */
    public static SessionWindows with(final long inactivityGap)


    /**
     * Set the window maintain duration in milliseconds of streams time.
     * This retention time is a guaranteed <i>lower bound</i> for how long 
     * a window will be maintained.
     *
     * @return  itself
     */
    public SessionWindows until(long durationMs)
 
    /**
     * @return the inactivityGap
     */
	public long inactivityGap()
    
   /**
    * @return the minimum amount of time a window will be maintained for.
    */
    public long maintainMs();

}

Merger

Code Block
languagejava
/**
 * The interface for merging aggregate values with the given key
 *
 * @param <K>   key type
 * @param <T>   aggregate value type
 */
public interface Merger<K, T> {
   /**
    * Compute a new aggregate from the key and two aggregates
    *
    * @param aggKey    the key of the record
    * @param aggOne    the first aggregate
    * @param aggTwo    the second aggregate
    * @return          the new aggregate value
    */
   T apply(K aggKey, T aggOne, T aggTwo);
}

SessionStore

Code Block
languagejava
/**
* Interface for storing the aggregated values of sessions
* @param <K>   type of the record keys
* @param <AGG> type of the aggregated values
*/
public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K, AGG> {

    /**
     * Find any aggregated session values with the matching key and where the 
     * session’s end time is >= earliestSessionEndTime, i.e, the oldest session to 
     * merge with, and the session’s start time is <= latestSessionStartTime, i.e,
     * the newest session to merge with.
     */
   KeyValueIterator<Windowed<K>, AGG> findSessionsToMerge(final K key, final long earliestSessionEndTime, final long latestSessionStartTime);
 
   /**
    * Remove the aggregated value for the session with the matching session key
    */
   void remove(final Windowed<K> sessionKey);

   /**
    * Write the aggregated result for the given session key
    */	 
   void put(final Windowed<K> sessionKey, AGG aggregate);
}

ReadOnlySessionStore

This is primarily provided for InteractiveQueries

Code Block
/**
 * A session store that only supports read operations.
 * Implementations should be thread-safe as concurrent reads and writes
 * are expected.
 *
 * @param <K> the key type
 * @param <V> the value type
 */
@InterfaceStability.Unstable
public interface ReadOnlySessionStore<K, AGG> {

    /**
     * Retrieve all aggregated sessions for the provided key
     * @param    key record key to find aggregated session values for
     * @return   KeyValueIterator containing all session aggregates for the provided key.
     */
    KeyValueIterator<Windowed<K>, AGG> fetch(final K key);
}

 

 

QueryableStoreTypes

Additional Method

Code Block
languagejava
/**
 * A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore}
 * @param <K>   key type of the store
 * @param <V>   value type of the store
 * @return  {@link SessionStoreType}
 */
public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, V>> sessionStore()

KGroupedStream

Additional methods

Code Block
languagejava
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                              final SessionWindows sessionWindows,
                              final String storeName);
 
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                              final SessionWindows sessionWindows,
                              final StateStoreSupplier<SessionStore> storeSupplier);


<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                     final Aggregator<K,Aggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super Merger<KK, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final String storeName);
 
<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                     final Aggregator<K, Aggregator<? super K, ? super V, T> aggregator,
                                     final Merger<KMerger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final StateStoreSupplier<SessionStore> storeSupplier);
 
KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String storeName);
 
KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final StateStoreSupplier<SessionStore> storeSupplier);

 

 

...