...
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: "Under Discussion"Adopted
Discussion thread: here
JIRA: KAFKA-13494
...
- WindowStore.fetch(K key, Instant timeFrom, Instant timeTo) ← WindowKeyQuery.withKeyAndWindowBoundswithKeyAndWindowStartRange(key, from, to)
WindowRangeQuery
...
- WindowStore.fetchAll(Instant timeFrom, Instant timeTo) ← WindowRangeQuery.withWindowStartRange(from, to)
- SessionStore.fetch(Bytes key) ← WindowRangeQuery.withKey(key)
There are multiple discussion points here:
- One reason we cannot use WindowKeyQuery to support SessionStore.fetch(key) is because SessionStore.fetch(key) returns a windowed key value iterator, KeyValueIterator<Windowed<Bytes>, byte[]>, whereas WindowKeyQuery is bound to Query<WindowStoreIterator<V>>.
- WindowRangeQuery does not currently support key ranges. We could add more cases here to get better coverage of WindowStore and SessionStore API. In this KIP, however, we focus on a subset of queries and defer support for additional cases to future KIPs.
Proposed Changes
The WindowKeyQuery class:
Code Block |
---|
@Evolving public class WindowKeyQuery<K, V> implements Query<WindowStoreIterator<V>> { public static <K, V> WindowKeyQuery<K, V> withKeyAndWindowBoundswithKeyAndWindowStartRange(final K key, final Instant startTimetimeFrom, final Instant endTimetimeTo); public K getKey(); public Optional<Instant> getStartTimegetTimeFrom(); public Optional<Instant> getEndTimegetTimeTo(); } |
The WindowRangeQuery class:
Code Block |
---|
@Evolving public class WindowRangeQuery<K, V> implements Query<KeyValueIterator<Windowed<K>, V>> { public static <K, V> WindowRangeQuery<K, V> withKey(final K key); public static <K, V> WindowRangeQuery<K, V> withWindowStartRange(final Instant earliestWindowStartTimetimeFrom, final Instant latestWindowStartTimetimeTo); public K getKey(); public Optional<Instant> getStartTimegetTimeFrom(); public Optional<Instant> getEndTimegetTimeTo(); } |
Examples
The following example illustrates the use of the WindowKeyQuey class.
Code Block |
---|
final Instant uppertimeTo = Instant.now(); final Instant lowertimeFrom = uppertimeTo.minusSeconds(60); final WindowKeyQuery<GenericKey, ValueAndTimestamp<GenericRow>> query = WindowKeyQuery.withKeyAndWindowBoundswithKeyAndWindowStartRange(key, lowertimeFrom, uppertimeTo); final StateQueryRequest<WindowStoreIterator<ValueAndTimestamp<GenericRow>>> request = inStore("rocksdb-window-store").withQuery(query); final StateQueryResult<WindowStoreIterator<ValueAndTimestamp<GenericRow>>> result = stateStore.getKafkaStreams()kafkaStream.query(request); final WindowStoreIterator<ValueAndTimestamp<GenericRow>> iterator = result.getGlobalResult().getResult(); |
The following example illustrates the use of the WindowQuery class to query a window store.
Code Block |
---|
final Instant uppertimeTo = Instant.now(); final Instant lowertimeFrom = uppertimeTo.minusSeconds(60); final WindowRangeQuery<GenericKey, ValueAndTimestamp<GenericRow>> query = WindowRangeQuery.withWindowRangewithWindowStartRange(lowertimeFrom, uppertimeTo); final StateQueryRequest<KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>>> request = inStore("inmemory-window-store").withQuery(query); final StateQueryResult<KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>>> result = stateStore.getKafkaStreams()kafkaStreams.query(request); final KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>> iterator = result.getGlobalResult().getResult(); |
...