...
Code Block | ||
---|---|---|
| ||
public static WindowBytesStoreSupplier inMemoryWindowStore(final String name, final Duration retentionPeriod, final Duration windowSize, final Duration gracePeriod) throws IllegalArgumentException { Objects.requireNonNull(name, "name cannot be null"); final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix); final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize"); final long windowSizeMs = ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix); final String gpMsgPrefix = prepareMillisCheckFailMsgPrefix(gracePeriod, "gracePeriod"); final long gracePeriodMs = ApiUtils.validateMillisecondDuration(gracePeriod, gpMsgPrefix); return inMemoryWindowStore(name, retentionMs, windowSizeMs, gracePeriodMs); } |
which in turns calls the private method
Code Block | ||
---|---|---|
| ||
private static WindowBytesStoreSupplier inMemoryWindowStore(final String name,
final long retentionPeriod,
final long windowSize,
final long gracePeriod) {
Objects.requireNonNull(name, "name cannot be null");
if (retentionPeriod < 0L) {
throw new IllegalArgumentException("retentionPeriod cannot be negative");
}
if (windowSize < 0L) {
throw new IllegalArgumentException("windowSize cannot be negative");
}
if (gracePeriod < 0L) {
throw new IllegalArgumentException("gracePeriod cannot be negative");
}
if (windowSize > retentionPeriod) {
throw new IllegalArgumentException("The retention period of the window store "
+ name + " must be no smaller than its window size. Got size=["
+ windowSize + "], retention=[" + retentionPeriod + "]");
}
if (gracePeriod > retentionPeriod) {
throw new IllegalArgumentException("The grace period of the window store "
+ name + " must not exceed its retention period. Got grace period=["
+ gracePeriod + "], retention=[" + retentionPeriod + "]");
}
return new InMemoryWindowBytesStoreSupplier(name, retentionPeriod, windowSize, gracePeriod);
} |
Proposed Changes
The design of the in-memory window store is as follows. The store is "segmented" according to the windowStartTimestamp, with each segment consisting of a key-value store that maps the record's key to its value. Each segment is only 1 ms in size, the granularity of the timestamps, and therefore contains at most one of each key. Records are inserted/updated/fetched by first looking up the segment corresponding to that timestamp, and then performing the appropriate action on the key-value store inside. Both the segments themselves and the key-value pairs inside them are stored in a TreeMap, allowing for efficient range queries both by timestamp and by key. Before each action (put/fetch), any segments that have expired since the previous action are removed.
...