Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 

SessionWindows

Code Block
titleSessionWindows
public class SessionWindows {

    /**
     * Create a new SessionWindows with the specified inactivityGap
     *
     * @param inactivityGap
     * @return
     */
    public static SessionWindows inactivityGap(final long inactivityGap)


    /**
     * Set the window maintain duration in milliseconds of streams time.
     * This retention time is a guaranteed <i>lower bound</i> for how long 
     * a window will be maintained.
     *
     * @return  itself
     */
    public SessionWindows until(long durationMs)

}

SessionMerger

Code Block
languagejavatitleSessionMerger
/**
 * The interface for merging aggregate values for {@link SessionWindows} with the given key
 *
 * @param <K>   key type
 * @param <T>   aggregate value type
 */
public interface SessionMerger<K, T> {
   /**
    * Compute a new aggregate from the key and two aggregates
    *
    * @param aggKey    the key of the record
    * @param aggOne    the first aggregate
    * @param aggTwo    the second aggregate
    * @return          the new aggregate value
    */
   T apply(K aggKey, T aggOne, T aggTwo);
}

SessionStore

Code Block
languagejavatitleSessionStore
/**
* Interface for storing the aggregated values of sessions
* @param <K>   type of the record keys
* @param <AGG> type of the aggregated values
*/
public interface SessionStore<K, AGG> extends StateStore {

    /**
     * Fetch any aggregated session values with the matching key and where the 
     * session’s end time is >= earliestEndTimeearliestSessionEndTime, i.e, the oldest session to 
     * merge with, and the session’s start time is <= latestStartTimelatestSessionStartTime, i.e,
     * the newest session to merge with.
     */
   KeyValueIterator<SessionKey<K>, AGG> fetch(final K key, final long earliestEndTimeearliestSessionEndTime, final long latestStartTimelatestSessionStartTime);
 
   /**
    * Remove the aggregated value for the session with the matching SessionKey
    */
   void remove(final SessionKey<K> sessionKey);

   /**
    * Write the aggregated result for the given SessionKey
    */	 
   void put(final SessionKey<K> key, AGG result);
}

 

SessionKey

Code Block
languagejavatitleSessionKey
/**
 * Represents the key for a Session Window
 */
public class SessionKey<K> {
    // record key
    private final K key;
    // session start time
    private final long start;
    // session end time
    private final long end; 
}

 

...