Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public static WindowBytesStoreSupplier inMemoryWindowStore(final String name,
                                                           final Duration retentionPeriod,
                                                           final Duration windowSize,
                                                           final Duration gracePeriod) throws IllegalArgumentException {
    Objects.requireNonNull(name, "name cannot be null");
    final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
    final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
    final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
    final long windowSizeMs = ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix);
    final String gpMsgPrefix = prepareMillisCheckFailMsgPrefix(gracePeriod, "gracePeriod");
    final long gracePeriodMs = ApiUtils.validateMillisecondDuration(gracePeriod, gpMsgPrefix);

    return inMemoryWindowStore(name, retentionMs, windowSizeMs, gracePeriodMs);
}

which in turns calls the private method 

Code Block
languagejava
private static WindowBytesStoreSupplier inMemoryWindowStore(final String name,
                                                            final long retentionPeriod,
                                                            final long windowSize,
                                                            final long gracePeriod) {
    Objects.requireNonNull(name, "name cannot be null");
    if (retentionPeriod < 0L) {
        throw new IllegalArgumentException("retentionPeriod cannot be negative");
    }
    if (windowSize < 0L) {
        throw new IllegalArgumentException("windowSize cannot be negative");
    }
    if (gracePeriod < 0L) {
        throw new IllegalArgumentException("gracePeriod cannot be negative");
    }
    if (windowSize > retentionPeriod) {
        throw new IllegalArgumentException("The retention period of the window store "
            + name + " must be no smaller than its window size. Got size=["
            + windowSize + "], retention=[" + retentionPeriod + "]");
    }
    if (gracePeriod > retentionPeriod) {
        throw new IllegalArgumentException("The grace period of the window store "
            + name + " must not exceed its retention period. Got grace period=["
            + gracePeriod + "], retention=[" + retentionPeriod + "]");
    }

    return new InMemoryWindowBytesStoreSupplier(name, retentionPeriod, windowSize, gracePeriod);
}



Proposed Changes

The design of the in-memory window store is as follows. The store is "segmented" according to the windowStartTimestamp, with each segment consisting of a key-value store that maps the record's key to its value. Each segment is only 1 ms in size, the granularity of the timestamps, and therefore contains at most one of each key. Records are inserted/updated/fetched by first looking up the segment corresponding to that timestamp, and then performing the appropriate action on the key-value store inside. Both the segments themselves and the key-value pairs inside them are stored in a TreeMap, allowing for efficient range queries both by timestamp and by key. Before each action (put/fetch), any segments that have expired since the previous action are removed. 

...