Table of Contents |
---|
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: "Under Discussion"
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
Adopted
Discussion thread: https://lists.apache.org/thread/4clhz43yy9nk6kkggbcn0y3v61b05sp1
Voting thread: https://lists.apache.org/thread/fh9gnhk9zoqlt3fy883hwjwh47qjj2c5
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Provide an implementation of the Query
interface, introduced in KIP-796: Interactive Query v2 , to support range and scan queries
Proposed Changes
The RangeQuery
class will be used for both range and scan queries. A scan is performed when no lower and no upper bound is specified, i.e. if the Optional is empty. A range query retrieves a set of keys, specified using an upper and/or lower bound, from the underlying KV store. A scan, on the other hand, retrieves all keys contained in the KV store.
Code Block | ||
---|---|---|
| ||
@Evolving public class RangeQuery<K, V> implements Query<V>Query<KeyValueIterator<K, V>> { private final Optional<K> lower; private final Optional<K> upper; private RangeQuery(final Optional<K> lower, final Optional<K> upper) { this.lower = lower; this.upper = upper; } public static <K, V> RangeQuery<K, V> withRange(final Optional<K>K lower, final Optional<K>K upper) { return new RangeQuery<>(Optional.of(lower), Optional.of(upper)); } public static <K, V> RangeQuery<K, V> withUpperBound(final K upper) { return new RangeQuery<>(Optional.empty(), Optional.of(upper)); } public static <K, V> RangeQuery<K, V> withLowerBound(final K lower) { return new RangeQuery<>(Optional.of(lower), Optional.empty()); } public static <K, V> RangeQuery<K, V> withNoBounds() { return new RangeQuery<>(Optional.empty(), Optional.empty()); } public Optional<K> getLowerBound() { return lower; } public Optional<K> getUpperBound() { return upper; } } // ====================================== // Range query example usage in IQv2: Integer key1 = 1; Integer key2 = 2; // create the query parameters final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes = kafkaStreams.serdesForStore("mystore") StateQueryRequest<KeyValueIterator<BytesStateQueryRequest<KeyValueIterator<Integer, byte[]>>Integer>> query = inStore("mystore") .withQuery(RangeQuery.withRange(Optional.of(Bytes.wrap(serdes.rawKey(key1))), Optional.of(Bytes.wrap(serdes.rawKey(key2))))); // run the query StateQueryResult<KeyValueIterator<BytesStateQueryResult<KeyValueIterator<Integer, byte[]>>Integer>> result = kafkaStreams.query(query); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<BytesQueryResult<KeyValueIterator<Integer, byte[]>>>Integer>>> partitionResults = rangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<BytesQueryResult<KeyValueIterator<Integer, byte[]>>>Integer>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<BytesKeyValueIterator<Integer, byte[]>Integer> keyValueIterator = entry.getValue().getResult()) { while (keyValueIterator.hasNext()) { final KeyValue<BytesKeyValue<Integer, byte[]>Integer> next = keyValueIterator.next(); Integer key = serdes.keyFrom(next.key.get()); Integer value = serdes.valueFrom(next.value)); } } } // ====================================== // Scan query example usage in IQv2: // create the query parameters StateQueryRequest<KeyValueIterator<Integer, Integer>> query = inStore("mystore") .withQuery(RangeQuery.withNoBounds()); // run the query StateQueryResult<KeyValueIterator<Integer, Integer>> result = kafkaStreams.query(query); |
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Rejected Alternatives
- Since this is a completely new set of APIs, no backward compatibility concerns are anticipated.
- Since nothing is deprecated in this KIP, users have no need to migrate unless they want to.
Rejected Alternatives
Initially, we proposed to add also a RawRangeQuery
typed with <KeyValueIterator<Bytes, byte[]>
. After looking at the code, it seems that it doesn't provide us with many benefits (we save on one cast) which doesn't justify the cost of adding an extra query to the public interfaceIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.