Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Accepted

Discussion thread: here

JIRA: KAFKA-4730

...


This KIP adds one public method to the Stores class that returns a InMemoryWindowBytesStoreSupplier (which implements WindowBytesStoreSupplier)


Code Block
languagejava
public static WindowBytesStoreSupplier inMemoryWindowStore(final String name,
                                                           final Duration retentionPeriod,
                                                           final Duration windowSize,
                                                           final Durationboolean gracePeriodretainDuplicates) throws IllegalArgumentException {
    Objects.requireNonNull(name, "name cannot be null");
    final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
    final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
    final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
    final long windowSizeMs = ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix);
    final String gpMsgPrefix = prepareMillisCheckFailMsgPrefix(gracePeriod, "gracePeriod");
    final long gracePeriodMs = ApiUtils.validateMillisecondDuration(gracePeriod, gpMsgPrefix);

    return inMemoryWindowStore(name, retentionMs, windowSizeMs, gracePeriodMs);
}



Proposed Changes

The design of the in-memory window store is as follows. The store is "segmented" according to the windowStartTimestamp, with each segment consisting of a key-value store that maps the record's key to its value. Each segment is only 1 ms in size, the granularity of the timestamps, and therefore contains at most one of each key. Records are inserted/updated/fetched by first looking up the segment corresponding to that timestamp, and then performing the appropriate action on the key-value store inside. Both the segments themselves and the key-value pairs inside them are stored in a TreeMap, allowing for efficient range queries both by timestamp and by key. Before each action (put/fetch), any segments that have expired since the previous action are removed. 

Two important parameters are the retention period and grace period. Retention period defines how long records will be kept, so that only records with a windowStartTimestamp > current time - retention period can be fetched and all others are deleted as soon as possible to free up resources. The grace period defines a window of time over which the store may be updated by late arriving records. Only records with a windowStartTimestamp > current time - grace period will be inserted/updated/deleted in the underlying store. 

This design was chosen to be the most memory efficient, while still allowing for quick (logN) put and fetch operations as well as deletion of expired recordswill allow for single and range fetch, both for a range of timestamps and keys. Expired records will be removed as soon as possible in order to free up resources. Using a NavigableMap we can provide logN time for operations such as put and fetch. Users can expect that the overall memory footprint includes the space required for all live records plus some additional space proportional to the number of fetched records that have not yet had their iterator closed. It is therefore important that users read these results and close the iterator as soon as possible.

Compatibility, Deprecation, and Migration Plan

...