Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We will provide a way for developers using the DSL to specify that they want an aggregation to be aggregated into SessionWindows. Three overloaded methods will be added to KGroupedStream:

Code Block
languagejava
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                             final SessionWindows sessionWindows,
                             final String storeName);


<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                   final Aggregator<K, V, T> aggregator,
                                   final SessionMerger<K, T> sessionMerger,
                                   final SessionWindows sessionWindows,
                                   final Serde<T> aggValueSerde,
                                   final String storeName);
 
KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String storeName);

 A typical aggregation might look like:

 

Code Block
languagejava
stream.groupByKey().aggregate(initializer, 
                             aggregator,
                             merger,  
                             SessionWindows.inactivityGap(FIVE_MINUTES)
                 .until(ONE_HOUR),
   aggregateValueSerde,
  “session-store”);

The above statement will aggregate the results into SessionWindows with an inactivity gap of five minutes. The Sessions will be retained for one hour from when they have closed, during which they will accept input from late-arriving records that would be considered as falling into a particular session.

In order to process SessionWindows we’ll need to add a new Processor. This will be responsible for creating sessions, merging existing sessions into sessions with larger windows, and producing aggregates from a session’s values.

On each incoming record the process method will:

  1. Find any existing sessions that fall within the inactivity gap, i,e., sessionStore sessionStore.fetch(userKey, timestamp - gap, timestamp + gap).

  2. Merge any existing sessions into a new larger session using the SessionMerger to the SessionMerger to merge the aggregates of the existing sessions.

  3. Aggregate the value record being processed with the merged session.

  4. Remove any merged sessions from the SessionStorethe SessionStore

  5. Store the new merged session in the SessionStorethe SessionStore.

We will leverage the work done in KIP-63 for forwarding the aggregated result of a SessionWindowsession window.  That is, the aggregates will be de-duplicated in the cache and only be forwarded downstream on commit, flush, or evict. This is inline with how all existing aggregations work in Kafka Streams.

Late arriving data

Late arriving data is mostly treated the same as non-late arriving data, i.e., it can create a new session or be merged into an existing one. The only difference is that if the data has arrived after the retention period, defined by SessionWindows.until(..), it will be silently dropped. 

SessionWindows

We propose to add a new class SessionWindowsSessionWindows will be able to be used with new overloaded operations on KGroupedStream, i.e, aggregate(…), count(..), reduce(…). A SessionWindows will have a defined gap, that represents the period of inactivity. It will also provide a method to , until(...), to specify how long the data in the SessionWindow is maintained retained for, i.e., to allow for late arriving data.

SessionStore

We propose to add a new type of StateStore, SessionStore. A SessionStore, is a segmented store, similar to a WindowStore, but the segments are indexed by session end time. We index by end time so that we can expire (remove) the Segments Segments containing sessions where session endTime < stream-time - retention-period.

The records in the SessionStore will be stored by a SessionKey. The SessionKey is a composite of the record key, window start, and window end times. The start and end times of the SessionKey are driven by the data. If the Session only has a single value then start == end.  The segment a Session is stored in is determined by SessionKey.end. Fetch requests against the SessionStore use both the SessionKey.start and Session.end to find sessions to merge.

 Each Segment is for a particular interval of time. To work out which Segment a session belongs in we simply divide SessionKey.end by the Segmentsegment interval. The Segment segment interval is calculated as Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL).

 

For example, given a segment interval of 1000:

 

 
SessionKey EndSegment Index
00
5000
10001
20002
 

Put

 

As session aggregates arrive, i.e., on put, the implementation of SessionStore will: 

  1. use SessionKey.end to get or create the Segment to store the aggregate in

  2. If the Segment is non-null, we add the aggregate to the segment Segment.

  3. If the Segment was null, this signals that the record is late and has arrived after the retention period. This record is dropped.

 

 

Fetch

 

When SessionStore.fetch(...) is called we find all the session aggregates for the record key where SessionKey.end >= earliestEndTime && SessionKey.start <= latestStartTime. In order to do this:

 

  1. Find the Segments to search by getting all Segments starting from earliestEndTime

  2. Define the range query as:

    1. from =  (record-key, end=earliestEndTime, start=0)

    2. to = (record-key, end=Long.MAX_VALUE, start=Long.MAX_VALUE)

  3. Provide a condition on the iterator such that it stops once a record with start > latestStartTime is found.

 

 

For example, if for an arbitrary record key we had the following sessions in the store:

 

Session StartSession End
099
101200
201300
301400

When we query the store with earliestEndTime = 150, latestStartTime = 300 we would retrieve sessions starting at 101 and 201.

 

Public Interfaces

 

SessionWindows

Code Block
titleSessionWindows
public class SessionWindows {

    /**
     * Create a new SessionWindows with the specified inactivityGap
     *
     * @param inactivityGap
     * @return
     */
    public static SessionWindows inactivityGap(final long inactivityGap)


    /**
     * Set the window maintain duration in milliseconds of streams time.
     * This retention time is a guaranteed <i>lower bound</i> for how long 
     * a window will be maintained.
     *
     * @return  itself
     */
    public SessionWindows until(long durationMs)

}

SessionMerger

Code Block
languagejava
titleSessionMerger
/**
 * The interface for merging aggregate values for {@link SessionWindows} with the given key
 *
 * @param <K>   key type
 * @param <T>   aggregate value type
 */
public interface SessionMerger<K, T> {
   /**
    * Compute a new aggregate from the key and two aggregates
    *
    * @param aggKey    the key of the record
    * @param aggOne    the first aggregate
    * @param aggTwo    the second aggregate
    * @return          the new aggregate value
    */
   T apply(K aggKey, T aggOne, T aggTwo);
}

SessionStore

Code Block
languagejava
titleSessionStore
/**
* Interface for storing the aggregated values of sessions
* @param <K>   type of the record keys
* @param <AGG> type of the aggregated values
*/
public interface SessionStore<K, AGG> extends StateStore {

    /**
     * Fetch any aggregated session values with the matching key and where the 
     * session’s end time is >= earliestEndTime, i.e, the oldest session to 
     * merge with, and the session’s start time is <= latestStartTime, i.e,
     * the newest session to merge with.
     */
   KeyValueIterator<SessionKey<K>, AGG> fetch(final K key, final long earliestEndTime, final long latestStartTime);
 
   /**
    * Remove the aggregated value for the session with the matching SessionKey
    */
   void remove(final SessionKey<K> sessionKey);

   /**
    * Write the aggregated result for the given SessionKey
    */	 
   void put(final SessionKey<K> key, AGG result);
}

 

SessionKey

Code Block
languagejava
titleSessionKey
/**
 * Represents the key for a Session Window
 */
public class SessionKey<K> {
    // record key
    private final K key;
    // session start time
    private final long start;
    // session end time
    private final long end; 
}

 

...