Status
Current state: "Under Discussion"
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Provide an implementation of the Query
interface, introduced in KIP-796: Interactive Query v2 , to support range and scan queries
Proposed Changes
The RangeQuery
class will be used for both range and scan queries. A scan is performed when no lower and no upper bound is specified
@Evolving public class RangeQuery<KeyValueIterator<K, V>> implements Query<KeyValueIterator<K, V>> { private final Optional<K> lower; private final Optional<K> upper; private RangeQuery(final Optional<K> lower, final Optional<K> upper) { this.lower = lower; this.upper = upper; } public static <K, V> RangeQuery<K, V> withRange(final K lower, final K upper) { return new RangeQuery<>(Optional.of(lower), Optional.of(upper)); } public static <K, V> RangeQuery<K, V> withUpperBound(final K upper) { return new RangeQuery<>(Optional.empty(), Optional.of(upper)); } public static <K, V> RangeQuery<K, V> withLowerBound(final K lower) { return new RangeQuery<>(Optional.of(lower), Optional.empty()); } public static <K, V> RangeQuery<K, V> withNoBounds() { return new RangeQuery<>(Optional.empty(), Optional.empty()); } public Optional<K> getLowerBound() { return lower; } public Optional<K> getUpperBound() { return upper; } } // ====================================== // Range query example usage in IQv2: Integer key1 = 1; Integer key2 = 2; // create the query parameters final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes = kafkaStreams.serdesForStore("mystore") StateQueryRequest<KeyValueIterator<Bytes, byte[]>> query = inStore("mystore") .withQuery(RangeQuery.withRange(Bytes.wrap(serdes.rawKey(key1)), Bytes.wrap(serdes.rawKey(key2)))); // run the query StateQueryResult<KeyValueIterator<Bytes, byte[]>> result = kafkaStreams.query(query); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults = rangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Bytes, byte[]> keyValueIterator = entry.getValue().getResult()) { while (keyValueIterator.hasNext()) { final KeyValue<Bytes, byte[]> next = keyValueIterator.next(); Integer key = serdes.keyFrom(next.key.get()); Integer value = serdes.valueFrom(next.value)); } } } // ====================================== // Scan query example usage in IQv2: // create the query parameters StateQueryRequest<KeyValueIterator<Bytes, byte[]>> query = inStore("mystore") .withQuery(RangeQuery.withNoBounds()); // run the query StateQueryResult<KeyValueIterator<Bytes, byte[]>> result = kafkaStreams.query(query);
There will also be an implementation for a "raw" version of the RangeQuery, which simply takes the key as a byte array and returns the value as a byte array.
public class RawRangeQuery implements Query<KeyValueIterator<Bytes, byte[]>> { private final Optional<Bytes> lower; private final Optional<Bytes> upper; private RawRangeQuery(final Optional<Bytes> lower, final Optional<Bytes> upper) { this.lower = lower; this.upper = upper; } public static RawRangeQuery withRange(final Bytes lower, final Bytes upper) { return new RawRangeQuery(Optional.of(lower), Optional.of(upper)); } public static RawRangeQuery withUpperBound(final Bytes upper) { return new RawRangeQuery(Optional.empty(), Optional.of(upper)); } public static RawRangeQuery withLowerBound(final Bytes lower) { return new RawRangeQuery(Optional.of(lower), Optional.empty()); } public static RawRangeQuery withNoBounds() { return new RawRangeQuery(Optional.empty(), Optional.empty()); } public Optional<Bytes> getLowerBound() { return lower; } public Optional<Bytes> getUpperBound() { return upper; }
Compatibility, Deprecation, and Migration Plan
- Since this is a completely new set of APIs, no backward compatibility concerns are anticipated.
- Since nothing is deprecated in this KIP, users have no need to migrate unless they want to.