THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
The RangeQuery
class will be used for both range and scan queries. A scan is performed when no lower and no upper bound is specified, i.e. if the Optional is empty.
Code Block | ||
---|---|---|
| ||
@Evolving public class RangeQuery<K, V> implements Query<V> { private final Optional<K> lower; private final Optional<K> upper; private RangeQuery(final Optional<K> lower, final Optional<K> upper) { this.lower = lower; this.upper = upper; } public static <K, V> RangeQuery<K, V> withRange(final Optional<K>K lower, final Optional<K>K upper) { return new RangeQuery<>(Optional.of(lower), Optional.of(upper)); } public static <K, V> RangeQuery<K, V> withUpperBound(final K upper) { return new RangeQuery<>(Optional.empty(), Optional.of(upper)); } public static <K, V> RangeQuery<K, V> withLowerBound(final K lower) { return new RangeQuery<>(Optional.of(lower), Optional.empty()); } public static <K, V> RangeQuery<K, V> withNoBounds() { return new RangeQuery<>(Optional.empty(), Optional.empty()); } public Optional<K> getLowerBound() { return lower; } public Optional<K> getUpperBound() { return upper; } } // ====================================== // Range query example usage in IQv2: Integer key1 = 1; Integer key2 = 2; // create the query parameters final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes = kafkaStreams.serdesForStore("mystore") StateQueryRequest<KeyValueIterator<Bytes, byte[]>> query = inStore("mystore") .withQuery(RangeQuery.withRange(Optional.of(Bytes.wrap(serdes.rawKey(key1))), Optional.of(Bytes.wrap(serdes.rawKey(key2))))); // run the query StateQueryResult<KeyValueIterator<Bytes, byte[]>> result = kafkaStreams.query(query); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults = rangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Bytes, byte[]> keyValueIterator = entry.getValue().getResult()) { while (keyValueIterator.hasNext()) { final KeyValue<Bytes, byte[]> next = keyValueIterator.next(); Integer key = serdes.keyFrom(next.key.get()); Integer value = serdes.valueFrom(next.value)); } } } // ====================================== // Scan query example usage in IQv2: // create the query parameters StateQueryRequest<KeyValueIterator<Bytes, byte[]>> query = inStore("mystore") .withQuery(RangeQuery.with NoBounds()); // run the query StateQueryResult<KeyValueIterator<Bytes, byte[]>> result = kafkaStreams.query(query); |
There will also be an implementation for a "raw" version of the RangeQuery, which simply takes the key as a byte array and returns the value as a byte array.
Code Block | ||
---|---|---|
| ||
public class RawRangeQuery implements Query<KeyValueIterator<Bytes, byte[]>> { private final Optional<Bytes> lower; private final Optional<Bytes> upper; private RawRangeQuery(final Optional<Bytes> lower, final Optional<Bytes> upper) { this.lower = lower; this.upper = upper; } public static RawRangeQuery withRange(final Bytes lower, final Bytes upper) { return new RawRangeQuery(Optional.of(lower), Optional.of(upper)); } public static RawRangeQuery withUpperBound(final Bytes upper) { return new RawRangeQuery(Optional.empty(), Optional.of(upper)),; } public static RawRangeQuery withLowerBound(final Bytes lower) { return new RawRangeQuery(Optional.of(lower), Optional.empty()); } public static RawRangeQuery withNoBounds() { return new RawRangeQuery(Optional.empty(), Optional.empty()));; } public Optional<Bytes> getLowerBound() { // run the query StateQueryResult<KeyValueIterator<Bytes, byte[]>> result = kafkaStreams.query(query); return lower; } public Optional<Bytes> getUpperBound() { return upper; } |
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Rejected Alternatives
...
- Since this is a completely new set of APIs, no backward compatibility concerns are anticipated.
- Since nothing is deprecated in this KIP, users have no need to migrate unless they want to.