This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: "Under Discussion"
Discussion thread: here
JIRA: KAFKA-13494
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Provide an implementation of the Query
interface, introduced in KIP-796: Interactive Query v2 , to support session and window queries.
Public Interfaces
In this KIP we propose two new public classes:
- WindowKeyQuery: used to search occurrences of keys in window stores in combination with IQv2
- WindowRangeQuery: used to search per window aggregates of keys in window and session stores in combination with IQv2
Proposed Changes
The WindowKeyQuery class:
public class WindowKeyQuery<K, V> implements Query<V> { private final K key; private final Optional<Long> windowLower; private WindowKeyQuery(final K key, final Optional<Long> windowLower) { this.key = key; this.windowLower = windowLower; } public static <K, V> WindowKeyQuery<K, V> withKey(final K key) { return new WindowKeyQuery<>(key, Optional.empty()); } public static <K, V> WindowKeyQuery<K, V> withKeyAndWindowLowerBound(final K key, final long time) { return new WindowKeyQuery<>(key, Optional.of(time)); } public K getKey() { return key; } public Optional<Long> getWindowLower() { return windowLower; } }
The WindowRangeQuery class:
public class WindowRangeQuery<K, V> implements Query<KeyValueIterator<K, V>> { private final Optional<K> lower; private final Optional<K> upper; private final Optional<Long> windowLower; private final Optional<Long> windowUpper; private WindowRangeQuery(final Optional<K> lower, final Optional<K> upper, final Optional<Long> windowLower, final Optional<Long> windowUpper) { this.lower = lower; this.upper = upper; this.windowLower = windowLower; this.windowUpper = windowUpper; } public static <K, V> WindowRangeQuery<K, V> withKeyRange(final K lower, final K upper, final Optional<Long> windowLower, final Optional<Long> windowUpper) { return new WindowRangeQuery<>(Optional.of(lower), Optional.of(upper), windowLower, windowUpper); } public static <K, V> WindowRangeQuery<K, V> withKeyUpperBound(final K upper, final Optional<Long> windowLower, final Optional<Long> windowUpper) { return new WindowRangeQuery<>(Optional.empty(), Optional.of(upper), windowLower, windowUpper); } public static <K, V> WindowRangeQuery<K, V> withKeyLowerBound(final K lower, final Optional<Long> windowLower, final Optional<Long> windowUpper) { return new WindowRangeQuery<>(Optional.of(lower), Optional.empty(), windowLower, windowUpper); } public static <K, V> WindowRangeQuery<K, V> withNoKeyBounds(final Optional<Long> windowLower, final Optional<Long> windowUpper) { return new WindowRangeQuery<>(Optional.empty(), Optional.empty(), windowLower, windowUpper); } public Optional<K> getKeyLowerBound() { return lower; } public Optional<K> getKeyUpperBound() { return upper; } public Optional<Long> getWindowLowerBound() { return windowLower; } public Optional<Long> getWindowUpperBound() { return windowUpper; } }
Examples
The following example illustrates the use of the WindowKeyQuey class.
Integer key = 1; long lowerBound = beginningOfDay(); StateQueryRequest<byte[]> query = inStore("rocksdbwindowstore") .withQuery(WindowKeyQuery.withKey(key, lowerBound)); // run the query StateQueryResult<byte[]> result = kafkaStreams.query(query);
The following example illustrates the use of the WindowQuery class.
Integer key1 = 1; Integer key2 = 2; long lowerBound = beginningOfDay(); long upperBound = now(); StateQueryRequest<KeyValueIterator<Windowed<Bytes>, byte[]>>, byte[]> query = inStore("rocksdbsessionstore") .withQuery(WindowQuery.withRange(key1, key2, lowerBound, upperBound)); // run the query StateQueryResult<KeyValueIterator<Windowed<Bytes>>, byte[]> result = kafkaStreams.query(query); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Windowed<Bytes>>, byte[]> partitionResults = rangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Windowed<Bytes>>, byte[]> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Windowed<Bytes>>, byte[] keyValueIterator = entry.getValue().getResult()) { while (keyValueIterator.hasNext()) { final KeyValue<Bytes, byte[]> next = keyValueIterator.next(); Bytes key = next.key.get; byte[] value = next.value; } } }
Compatibility, Deprecation, and Migration Plan
- Since this is a completely new set of APIs, no backward compatibility concerns are anticipated.
- Since nothing is deprecated in this KIP, users have no need to migrate unless they want to.