You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: KAFKA-4730

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Streams currently has both an in-memory and persistent implementation of its key-value store, but only a persistent windowed store. Extending the current window store with an in-memory implementation would allow for significant performance increase and has been requested by a number of users.

Public Interfaces


This KIP adds one public method to the Stores class that returns a InMemoryWindowBytesStoreSupplier (which implements WindowBytesStoreSupplier)


public static WindowBytesStoreSupplier inMemoryWindowStore(final String name,
                                                           final Duration retentionPeriod,
                                                           final Duration windowSize,
                                                           final Duration gracePeriod) throws IllegalArgumentException {
    Objects.requireNonNull(name, "name cannot be null");
    final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
    final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
    final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
    final long windowSizeMs = ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix);
    final String gpMsgPrefix = prepareMillisCheckFailMsgPrefix(gracePeriod, "gracePeriod");
    final long gracePeriodMs = ApiUtils.validateMillisecondDuration(gracePeriod, gpMsgPrefix);

    return inMemoryWindowStore(name, retentionMs, windowSizeMs, gracePeriodMs);
}

which in turns calls the private method 

private static WindowBytesStoreSupplier inMemoryWindowStore(final String name,
                                                            final long retentionPeriod,
                                                            final long windowSize,
                                                            final long gracePeriod) {
    Objects.requireNonNull(name, "name cannot be null");
    if (retentionPeriod < 0L) {
        throw new IllegalArgumentException("retentionPeriod cannot be negative");
    }
    if (windowSize < 0L) {
        throw new IllegalArgumentException("windowSize cannot be negative");
    }
    if (gracePeriod < 0L) {
        throw new IllegalArgumentException("gracePeriod cannot be negative");
    }
    if (windowSize > retentionPeriod) {
        throw new IllegalArgumentException("The retention period of the window store "
            + name + " must be no smaller than its window size. Got size=["
            + windowSize + "], retention=[" + retentionPeriod + "]");
    }
    if (gracePeriod > retentionPeriod) {
        throw new IllegalArgumentException("The grace period of the window store "
            + name + " must not exceed its retention period. Got grace period=["
            + gracePeriod + "], retention=[" + retentionPeriod + "]");
    }

    return new InMemoryWindowBytesStoreSupplier(name, retentionPeriod, windowSize, gracePeriod);
}


Proposed Changes

The design of the in-memory window store is as follows. The store is "segmented" according to the windowStartTimestamp, with each segment consisting of a key-value store that maps the record's key to its value. Each segment is only 1 ms in size, the granularity of the timestamps, and therefore contains at most one of each key. Records are inserted/updated/fetched by first looking up the segment corresponding to that timestamp, and then performing the appropriate action on the key-value store inside. Both the segments themselves and the key-value pairs inside them are stored in a TreeMap, allowing for efficient range queries both by timestamp and by key. Before each action (put/fetch), any segments that have expired since the previous action are removed. 

Two important parameters are the retention period and grace period. Retention period defines how long records will be kept, so that only records with a windowStartTimestamp > current time - retention period can be fetched and all others are deleted as soon as possible to free up resources. The grace period defines a window of time over which the store may be updated by late arriving records. Only records with a windowStartTimestamp > current time - grace period will be inserted/updated/deleted in the underlying store. 

This design was chosen to be the most memory efficient, while still allowing for quick (logN) put and fetch operations as well as deletion of expired records.

Compatibility, Deprecation, and Migration Plan

N/A

Rejected Alternatives

Several alternative designs for the in-memory window store were considered. One idea was to follow the segmented approach of the persistent window store, which groups records into larger time blocks for efficient batch deletion. Only after the last record in a segment has expired is the entire segment deleted, meaning some older records may exist for a while after they have technically expired, depending on the size of the segment. This tradeoff between time and space makes sense for RocksDB, but ultimately was rejected for the in-memory implementation as we would prefer to reclaim the resources as soon as possible after expiration. 

  • No labels