Table of Contents |
---|
Status
Current state: Under DiscussionAccepted
Discussion thread: here
JIRA: here
Released:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
We will provide a way for developers using the DSL to specify that they want an aggregation to be aggregated into SessionWindows. Three overloaded methods will be added to KGroupedStream:
Code Block | ||
---|---|---|
| ||
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final SessionWindows sessionWindows, final String storeName); <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final Aggregator<? super Aggregator<KK, ? super V, T> aggregator, final Merger<? super SessionMerger<KK, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final String storeName); KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String storeName); |
A typical aggregation might look like:
Code Block | ||
---|---|---|
| ||
stream.groupByKey().aggregate(initializer, aggregator, merger, SessionWindows.inactivityGapwith(FIVE_MINUTES) .until(ONE_HOUR), aggregateValueSerde, “session-store”); |
In order to process SessionWindows we’ll need to add a new Processor. This will be responsible for creating sessions, merging existing sessions into sessions with larger windows, and producing aggregates from a session’s values.
On each incoming record the process method will:
Find any existing adjacent sessions that fall within either start or end within the inactivity gap, i,e., sessionStore.fetch(userKey, timestamp - gap, timestamp + gap)where the end time of the session is > now - inactivity gap, or the start time is < now + inactivity gap.
Merge any existing sessions into a new larger session using the SessionMerger to the SessionMerger to merge the aggregates of the existing sessions.
Aggregate the value record being processed with the merged session.Remove any merged sessions from the SessionStore
Store the new merged session in the SessionStorethe SessionStore.
Remove any merged sessions from the SessionStore.
Late arriving data
Late arriving data is mostly treated the same as non-late arriving data, i.e., it can create a new session or be merged into an existing one. The only difference is that if the data has arrived after the retention period, defined by SessionWindows.until(..), a new session will be created and aggregated, but it will be silently dropped. not be persisted to the store.
SessionWindows
We propose to add a new class SessionWindows. SessionWindows will be able to be used with new overloaded operations on KGroupedStream, i.e, aggregate(…), count(..), reduce(…). A SessionWindows will have a defined gap, that represents the period of inactivity. It will also provide a method to specify how , until(...), to specify how long the data in the SessionWindow is maintained retained for, i.e., to allow for late arriving data.
SessionStore
We propose to add a new type of StateStore, SessionStore. A SessionStore, is a segmented store, similar to a WindowStore, but the segments are indexed by session end time. We index by end time so that we can expire (remove) the Segments Segments containing sessions where session endTime < stream-time - retention-period.
The records in the SessionStore will be stored by a SessionKeyWindowed key. The SessionKey Windowed key is a composite of the record key , window start, and window end timesand a TimeWindow. The start and end times of the SessionKey TimeWindow are driven by the data. If the Session only has a single value then start == end. The segment a Session is stored in is determined by SessionKeyTimeWindow.end. Fetch requests against the SessionStore use both the SessionKeyTimeWindow.start and SessionTimeWindow.end to find sessions to merge.
Each Segment is for a particular interval of time. To work out which Segment a session belongs in we simply divide SessionKeyTimeWindow.end by the Segmentsegment interval. The Segment segment interval is calculated as Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL).
SessionKey TimeWindow End | Segment Index |
---|---|
0 | 0 |
500 | 0 |
1000 | 1 |
2000 | 2 |
Put
As session aggregates arrive, i.e., on put, the implementation of SessionStore will:
use SessionKeyTimeWindow.end to get an existing segment or create the a new Segment to store the aggregate in. A new Segment will only be created if the TimeWindow.end is within the retention period.
If the Segment is non-null, we add the aggregate to the segment Segment.
If the Segment was null, this this signals that the record is late and has arrived after the retention period. This record is droppednot added to the store.
Fetch
FindSessionsToMerge
When SessionStore.fetchfindSessionsToMerge(...) is called we find all the session aggregates for the record key where SessionKeyTimeWindow.end >= earliestEndTime && SessionKeyTimeWindow.start <= latestStartTime. In order to do this:
Find the Segments to search by getting all Segments starting from earliestEndTime
Define the range query as:
from = (record-key, end=earliestEndTime, start=0)
to = (record-key, end=Long.MAX_VALUE, start=Long.MAX_VALUE)
Provide a condition on the iterator such that it stops once a record with start > latestStartTime is found.
latestStartTime)
For For example, if for an arbitrary record key we had the following sessions in the store:
Session Start | Session End |
---|---|
0 | 99 |
101 | 200 |
201 | 300 |
301 | 400 |
Public Interfaces
SessionWindows
Code Block | ||||
---|---|---|---|---|
| ||||
public class SessionWindows { /** * Create a new SessionWindows with the specified inactivityGap * * @param inactivityGap * @return */ public static SessionWindows inactivityGapwith(final long inactivityGap) /** * Set the window maintain duration in milliseconds of streams time. * This retention time is a guaranteed <i>lower bound</i> for how long * a window will be maintained. * * @return itself */ public SessionWindows until(long durationMs) } | ||||
Code Block | ||||
| ||||
/** * The interface for merging* aggregate@return valuesthe forinactivityGap {@link SessionWindows} with the given key **/ public long inactivityGap() * @param <K> key type/** * @param <T> * @return aggregatethe valueminimum type */ public interface SessionMerger<K, T> { /**amount of time a window will be maintained for. */ Compute a newpublic long maintainMs(); } |
Merger
Code Block | ||
---|---|---|
| ||
/** * The interface for merging aggregate values with the given key * aggregate from the key and two aggregates * * @param aggKey<K> key type the* key@param of<T> the record aggregate value type */ public @paraminterface aggOneMerger<K, T> { the first aggregate /** * @paramCompute aggTwoa new aggregate from the second aggregatekey and two aggregates * @return * @param aggKey the newkey aggregateof the valuerecord */ @param aggOne the first aggregate * T apply(K aggKey, T aggOne, T aggTwo); } |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Interface for storing the aggregated values of sessions
* @param <K> type of the record keys
* @param <AGG> type of the aggregated values
*/
public interface SessionStore<K, AGG> extends StateStore {
/**
* Fetch any aggregated session values with the matching key and where the
* session’s end time is >= earliestEndTime, i.e, the oldest session to
* merge with, and the session’s start time is <= latestStartTime, i.e,
* the newest session to merge with.
*/
KeyValueIterator<SessionKey<K>, AGG> fetch(final K key, final long earliestEndTime, final long latestStartTime);
/**
* Remove the aggregated value for the session with the matching SessionKey
*/
void remove(final SessionKey<K> sessionKey);
/**
* Write the aggregated result for the given SessionKey
*/
void put(final SessionKey<K> key, AGG result);
}
|
@param aggTwo the second aggregate
* @return the new aggregate value
*/
T apply(K aggKey, T aggOne, T aggTwo);
}
|
SessionStore
Code Block | ||
---|---|---|
| ||
/**
* Interface for storing the aggregated values of sessions
* @param <K> type of the record keys
* @param <AGG> type of the aggregated values
*/
public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K, AGG> {
/**
* Find any aggregated session values with the matching key and where the
* session’s end time is >= earliestSessionEndTime, i.e, the oldest session to
* merge with, and the session’s start time is <= latestSessionStartTime, i.e,
* the newest session to merge with.
*/
KeyValueIterator<Windowed<K>, AGG> findSessionsToMerge(final K key, final long earliestSessionEndTime, final long latestSessionStartTime);
/**
* Remove the aggregated value for the session with the matching session key
*/
void remove(final Windowed<K> sessionKey);
/**
* Write the aggregated result for the given session key
*/
void put(final Windowed<K> sessionKey, AGG aggregate);
}
|
ReadOnlySessionStore
This is primarily provided for InteractiveQueries
Code Block |
---|
/**
* A session store that only supports read operations.
* Implementations should be thread-safe as concurrent reads and writes
* are expected.
*
* @param <K> the key type
* @param <V> the value type
*/
@InterfaceStability.Unstable
public interface ReadOnlySessionStore<K, AGG> {
/**
* Retrieve all aggregated sessions for the provided key
* @param key record key to find aggregated session values for
* @return KeyValueIterator containing all session aggregates for the provided key.
*/
KeyValueIterator<Windowed<K>, AGG> fetch(final K key);
} |
QueryableStoreTypes
Additional Method
Code Block | ||
---|---|---|
| ||
/**
* A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore}
* @param <K> key type of the store
* @param <V> value type of the store
* @return {@link SessionStoreType}
*/
public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, V>> sessionStore() |
KGroupedStream
Additional methods
Code Block | ||
---|---|---|
| ||
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows,
final String storeName);
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows,
final StateStoreSupplier<SessionStore> storeSupplier);
<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
final Merger<? super K, T> sessionMerger,
final SessionWindows sessionWindows,
final Serde<T> aggValueSerde,
final String storeName);
<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
final Merger<? super K, T> sessionMerger,
final SessionWindows sessionWindows,
final Serde<T> aggValueSerde,
final StateStoreSupplier<SessionStore> storeSupplier);
KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String storeName);
KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final StateStoreSupplier<SessionStore> storeSupplier); |
language | java |
---|---|
title | SessionKey |
Compatibility, Deprecation, and Migration Plan
...