Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public static WindowBytesStoreSupplier inMemoryWindowStore(final String name,
                                                           final Duration retentionPeriod,
                                                           final Duration windowSize,
                                                           final Duration gracePeriod) throws IllegalArgumentException {
    Objects.requireNonNull(name, "name cannot be null");
    final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
    final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
    final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
    final long windowSizeMs = ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix);
    final String gpMsgPrefix = prepareMillisCheckFailMsgPrefix(gracePeriod, "gracePeriod");
    final long gracePeriodMs = ApiUtils.validateMillisecondDuration(gracePeriod, gpMsgPrefix);

    return inMemoryWindowStore(name, retentionMs, windowSizeMs, gracePeriodMs);
}



Proposed Changes

The design of the in-memory window store is as follows. The store is "segmented" according to the windowStartTimestamp, with each segment consisting of a key-value store that maps the record's key to its value. Each segment is only 1 ms in size, the granularity of the timestamps, and therefore contains at most one of each key. Records are inserted/updated/fetched by first looking up the segment corresponding to that timestamp, and then performing the appropriate action on the key-value store inside. Both the segments themselves and the key-value pairs inside them are stored in a TreeMap, allowing for efficient range queries both by timestamp and by key. Before each action (put/fetch), any segments that have expired since the previous action are removed. Two important parameters are ; the retention period and grace period. Retention period defines how long records will be kept, so such that only records with a windowStartTimestamp > current time - retention period can be fetched and all others are deleted as soon as possible to free up resources. The grace period defines a window of time over which the store may be updated by late arriving records. Only records with a windowStartTimestamp > current time - grace period will be inserted/updated/deleted in the underlying store. 

This design was chosen to be the most memory efficient, while still allowing for quick (logN) put and fetch operations as well as deletion of expired records. Fetch calls return a snapshot of the state of the store at the time of the fetch, meaning records will not expire out from under a returned iterator.

Compatibility, Deprecation, and Migration Plan

...