Status
Current state: Under Discussion
Discussion thread: here
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Unlike for key-value state stores, Kafka Streams currently does not provide a way to query the range of keys available in a windowed state store.
The only available alternative is to aggregate all the distinct keys into a single key for a given window, or to implement your own windowed state store.
Similarly, session stores currently only offer querying by a given key, requiring the user to maintain the set of queryable keys separately.
Public Interfaces
This KIP would add the following methods:
ReadOnlyWindowStore
interface
WindowStoreIterator<KeyValue<K, V>> fetch(K from, K to, long timeFrom, long timeTo)
The time range would follow the existing ReadOnlyWindowStore.fetch(K key, long timeFrom, long timeTo)
behavior for the time range.
Key range behavior would be consistent with the existing ReadOnlyKeyValueStore.range(K from, K to)
behavior.
ReadOnlySessionStore
interface
KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to)
Key range behavior would be consistent with the existing ReadOnlyKeyValueStore.range(K from, K to)
behavior.
Proposed Changes
This KIP proposes to add the interface described above and implement range scan returning all the entries in the given key range.
Compatibility, Deprecation, and Migration Plan
- Users implementing their own state stores would be affected by the interface changes.
Rejected Alternatives
Arguably, the return type in ReadOnlyWindowStore is not ideal, since WindowStoreIterator is already a KeyValueIterator which abuses the key as a timestamp and the value as the object of interest. However, given that the existing API already abuses the iterator in a similar way, and barring more invasive changes to the API, or the introduction of type that embeds all of timestamp, key, and value – which would probably be a much larger discussion in itself – it seemed simpler to follow the existing model.