...
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: "Under Discussion"Adopted
Discussion thread: here
JIRA: KAFKA-13494
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Provide an implementation of the Query
interface, introduced in KIP-796: Interactive Query v2 , to support session and window queries.
Public Interfaces
In this KIP we propose two new public classes:
...
WindowKeyQuery
This type used to
...
search occurrences of keys in window stores in combination with IQv2
...
. In particular the following mappings are established:
- WindowStore.fetch(K key, Instant timeFrom, Instant timeTo) ← WindowKeyQuery.withKeyAndWindowStartRange(key, from, to)
WindowRangeQuery
The query type is used to search per window aggregates of keys in window and session stores in combination with IQv2. In particular the following mappings are established:
- WindowStore.fetchAll(Instant timeFrom, Instant timeTo) ← WindowRangeQuery.withWindowStartRange(from, to)
- SessionStore.fetch(Bytes key) ← WindowRangeQuery.withKey(key)
There are multiple discussion points here:
- One reason we cannot use WindowKeyQuery to support SessionStore.fetch(key) is because SessionStore.fetch(key) returns a windowed key value iterator, KeyValueIterator<Windowed<Bytes>, byte[]>, whereas WindowKeyQuery is bound to Query<WindowStoreIterator<V>>.
- WindowRangeQuery does not currently support key ranges. We could add more cases here to get better coverage of WindowStore and SessionStore API. In this KIP, however, we focus on a subset of queries and defer support for additional cases to future KIPs.
Proposed Changes
The SessionQuery
class WindowKeyQuery class:
Code Block |
---|
@Evolving public class SessionQuery<KWindowKeyQuery<K, V> implements Query<V>Query<WindowStoreIterator<V>> { private final Optional<K> lower; private final Optional<K> upper; private SessionQuery(final Optional<K> lower, final Optional<K> upper) { this.lower = lower; this.upper = upper; } public static <K, V> SessionQuery<KWindowKeyQuery<K, V> withRangewithKeyAndWindowStartRange(final K lowerkey, final K upper) { return new SessionQuery<>(Optional.of(lower), Optional.of(upper)); } public static <K, V> SessionQuery<K, V> withUpperBound(final K upper) { return new SessionQuery<>(Optional.empty(), Optional.of(upper)); } public static <K, V> SessionQuery<K, V> withLowerBound(final K lower) { return new SessionQuery<>(Optional.of(lower), Optional.empty()); } Instant timeFrom, final Instant timeTo); public K getKey(); public static <K, V> SessionQuery<K, V> withNoBounds() { return new SessionQuery<>(Optional.empty(), Optional.empty()); } Optional<Instant> getTimeFrom(); public Optional<K>Optional<Instant> getLowerBound() { return lower; } public Optional<K> getUpperBound() { return upper; }getTimeTo(); } |
The WindowQuery WindowRangeQuery class:
Code Block |
---|
@Evolving public class WindowQuery<KWindowRangeQuery<K, V> implements Query<V> { private final Optional<K> lower; private final Optional<K> upper; private WindowQuery(final Optional<K> lower, final Optional<K> upper) { this.lower = lower; this.upper = upper; } Query<KeyValueIterator<Windowed<K>, V>> { public static <K, V> WindowQuery<KWindowRangeQuery<K, V> withRangewithKey(final K lower, final K upper) { return new WindowQuery<>(Optional.of(lower), Optional.of(upper)); } key); public static <K, V> WindowQuery<KWindowRangeQuery<K, V> withUpperBoundwithWindowStartRange(final KInstant upper) { return new WindowQuery<>(Optional.empty(), Optional.of(upper)); } timeFrom, final Instant timeTo); public static <K, V> WindowQuery<K, V> withLowerBound(final K lower) { return new WindowQuery<>(Optional.of(lower), Optional.empty()); } public static <K, V> WindowQuery<K, V> withNoBounds() { return new WindowQuery<>(Optional.empty(), Optional.empty()); } K getKey(); public Optional<Instant> getTimeFrom(); public Optional<K>Optional<Instant> getLowerBoundgetTimeTo() { return lower; } public Optional<K> getUpperBound() { return upper; } } |
Examples
The following example illustrates the use of the SessionQuery WindowKeyQuey class.
Code Block |
---|
Integerfinal key1 = 1; Integer key2Instant timeTo = 2; // create the query parameters final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes = kafkaStreams.serdesForStore("mystore") StateQueryRequest<KeyValueIterator<Integer, Integer>>Instant.now(); final Instant timeFrom = timeTo.minusSeconds(60); final WindowKeyQuery<GenericKey, ValueAndTimestamp<GenericRow>> query = inStore("mystore") .withQuery(SessoinQuery.withRange(key1, key2)); // run the query StateQueryResult<KeyValueIterator<Integer, Integer>> result = kafkaStreams.query(query); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, Integer>>> partitionResults = rangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, Integer>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, Integer> keyValueIterator = entry.getValue().getResult()) { while (keyValueIterator.hasNext()) { final KeyValue<Integer, Integer> next = keyValueIterator.next(); Integer key = next.key.get; Integer value = next.value; } } } WindowKeyQuery.withKeyAndWindowStartRange(key, timeFrom, timeTo); final StateQueryRequest<WindowStoreIterator<ValueAndTimestamp<GenericRow>>> request = inStore("rocksdb-window-store").withQuery(query); final StateQueryResult<WindowStoreIterator<ValueAndTimestamp<GenericRow>>> result = kafkaStream.query(request); final WindowStoreIterator<ValueAndTimestamp<GenericRow>> iterator = result.getGlobalResult().getResult(); |
The following example illustrates the use of the WindowQuery class to query a window store.
Code Block |
---|
Integerfinal key1 = 1; Integer key2Instant timeTo = 2; // create the query parametersInstant.now(); final StateSerdes<Integer,Instant ValueAndTimestamp<Integer>>timeFrom serdes = kafkaStreams.serdesForStore("mystore") StateQueryRequest<KeyValueIterator<Integer, Integer>> timeTo.minusSeconds(60); final WindowRangeQuery<GenericKey, ValueAndTimestamp<GenericRow>> query = inStore("mystore") .withQuery(WindowQuery.withRange(key1, key2)); // run the query StateQueryResult<KeyValueIterator<Integer, Integer>> result = kafkaStreams.query(query); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, Integer>>> partitionResults = rangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, Integer>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, Integer> keyValueIterator = entry.getValue().getResult()) { while (keyValueIterator.hasNext()) { final KeyValue<Integer, Integer> next = keyValueIterator.next(); Integer key = next.key.get; Integer value = next.value; } } } WindowRangeQuery.withWindowStartRange(timeFrom, timeTo); final StateQueryRequest<KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>>> request = inStore("inmemory-window-store").withQuery(query); final StateQueryResult<KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>>> result = kafkaStreams.query(request); final KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>> iterator = result.getGlobalResult().getResult(); |
Compatibility, Deprecation, and Migration Plan
...