Status
Current state: "Under Discussion"
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Provide an implementation of the Query
interface, introduced in KIP-796: Interactive Query v2 , to support range and scan queries
Proposed Changes
The RangeQuery
class will be used for both range and scan queries. A scan is performed when no lower and no upper bound is specified
@Evolving public class RangeQuery<K, V> implements Query<V> { private final Optional<K> lower; private final Optional<K> upper; private RangeQuery(final Optional<K> lower, final Optional<K> upper) { this.lower = lower; this.upper = upper; } public static <K, V> RangeQuery<K, V> withRange(final K lower, final K upper) { return new RangeQuery<>(Optional.of(lower), Optional.of(upper)); } public static <K, V> RangeQuery<K, V> withUpperBound(final K upper) { return new RangeQuery<>(Optional.empty(), Optional.of(upper)); } public static <K, V> RangeQuery<K, V> withLowerBound(final K lower) { return new RangeQuery<>(Optional.of(lower), Optional.empty()); } public static <K, V> RangeQuery<K, V> withNoBounds() { return new RangeQuery<>(Optional.empty(), Optional.empty()); } public Optional<K> getLowerBound() { return lower; } public Optional<K> getUpperBound() { return upper; } } // ====================================== // Range query example usage in IQv2: Integer key1 = 1; Integer key2 = 2; // create the query parameters final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes = kafkaStreams.serdesForStore("mystore") StateQueryRequest<KeyValueIterator<Integer, Integer>> query = inStore("mystore") .withQuery(RangeQuery.withRange(key1, key2)); // run the query StateQueryResult<KeyValueIterator<Integer, Integer>> result = kafkaStreams.query(query); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, Integer>>> partitionResults = rangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, Integer>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, Integer> keyValueIterator = entry.getValue().getResult()) { while (keyValueIterator.hasNext()) { final KeyValue<Integer, Integer> next = keyValueIterator.next(); Integer key = next.key.get; Integer value = next.value; } } } // ====================================== // Scan query example usage in IQv2: // create the query parameters StateQueryRequest<KeyValueIterator<Integer, Integer>> query = inStore("mystore") .withQuery(RangeQuery.withNoBounds()); // run the query StateQueryResult<KeyValueIterator<Integer, Integer>> result = kafkaStreams.query(query);
There will also be an implementation for a "raw" version of the RangeQuery, which simply takes the key as a byte array and returns the value as a byte array.
public class RawRangeQuery implements Query<KeyValueIterator<Bytes, byte[]>> { private final Optional<Bytes> lower; private final Optional<Bytes> upper; private RawRangeQuery(final Optional<Bytes> lower, final Optional<Bytes> upper) { this.lower = lower; this.upper = upper; } public static RawRangeQuery withRange(final Bytes lower, final Bytes upper) { return new RawRangeQuery(Optional.of(lower), Optional.of(upper)); } public static RawRangeQuery withUpperBound(final Bytes upper) { return new RawRangeQuery(Optional.empty(), Optional.of(upper)); } public static RawRangeQuery withLowerBound(final Bytes lower) { return new RawRangeQuery(Optional.of(lower), Optional.empty()); } public static RawRangeQuery withNoBounds() { return new RawRangeQuery(Optional.empty(), Optional.empty()); } public Optional<Bytes> getLowerBound() { return lower; } public Optional<Bytes> getUpperBound() { return upper; }
Compatibility, Deprecation, and Migration Plan
- Since this is a completely new set of APIs, no backward compatibility concerns are anticipated.
- Since nothing is deprecated in this KIP, users have no need to migrate unless they want to.