Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
titleRangeQuery.java
@Evolving
public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {      
    private final Optional<K> lower;
    private final Optional<K> upper;

    private RangeQuery(final Optional<K> lower, final Optional<K> upper) {
        this.lower = lower;
        this.upper = upper;
    }

    public static <K, V> RangeQuery<K, V> withRange(final K lower, final K upper) {
        return new RangeQuery<>(Optional.of(lower), Optional.of(upper));
    }

    public static <K, V> RangeQuery<K, V> withUpperBound(final K upper) {
        return new RangeQuery<>(Optional.empty(), Optional.of(upper));
    }

    public static <K, V> RangeQuery<K, V> withLowerBound(final K lower) {
        return new RangeQuery<>(Optional.of(lower), Optional.empty());
    }

    public static <K, V> RangeQuery<K, V> withNoBounds() {
        return new RangeQuery<>(Optional.empty(), Optional.empty());
    }

    public Optional<K> getLowerBound() {
        return lower;
    }

    public Optional<K> getUpperBound() {
        return upper;
    }
}
 
// ======================================
// Range query example usage in IQv2:
 
Integer key1 = 1;
Integer key2 = 2;

// create the query parameters
final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
      kafkaStreams.serdesForStore("mystore")

StateQueryRequest<KeyValueIterator<Integer, Integer>> query =
  inStore("mystore")
  .withQuery(RangeQuery.withRange(key1, key2));
      
// run the query
StateQueryResult<KeyValueIterator<Integer, Integer>> result = kafkaStreams.query(query);
 
// Get the results from all partitions.
        final Map<Integer, QueryResult<KeyValueIterator<Integer, Integer>>> partitionResults =
            rangeResult.getPartitionResults();
        for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, Integer>>> entry : partitionResults.entrySet()) {
            try (final KeyValueIterator<Integer, Integer> keyValueIterator = entry.getValue().getResult()) {
                while (keyValueIterator.hasNext()) {
                    final KeyValue<Integer, Integer> next = keyValueIterator.next();
                    Integer key = next.key.get;
					Integer value = next.value;
                }
            }
        } 


// ======================================
// Scan query example usage in IQv2:

// create the query parameters
StateQueryRequest<KeyValueIterator<Integer, Integer>> query =
  inStore("mystore")
  .withQuery(RangeQuery.withNoBounds());
      
// run the query
StateQueryResult<KeyValueIterator<Integer, Integer>> result = kafkaStreams.query(query);
  

There will also be an implementation for a "raw" version of the RangeQuery, which simply takes the key as a byte array and returns the value as a byte array. The reason why we provide also a raw version is that it simplifies internal query handling since the bytes stores only support raw queries. No type check needed for every query key. Moreover,  we want to have the option to let ksql or any other "user" to handle de/serialization and avoid multiple serialization round-trips during query handling, which is unavoidable in IQv1

Code Block
titleRawRangeQuery
public class RawRangeQuery implements Query<KeyValueIterator<Bytes, byte[]>> {

    private final Optional<Bytes> lower;
    private final Optional<Bytes> upper;

    private RawRangeQuery(final Optional<Bytes> lower, final Optional<Bytes> upper) {
        this.lower = lower;
        this.upper = upper;
    }

    public static RawRangeQuery withRange(final Bytes lower, final Bytes upper) {
        return new RawRangeQuery(Optional.of(lower), Optional.of(upper));
    }

    public static RawRangeQuery withUpperBound(final Bytes upper) {
        return new RawRangeQuery(Optional.empty(), Optional.of(upper));
    }

    public static RawRangeQuery withLowerBound(final Bytes lower) {
        return new RawRangeQuery(Optional.of(lower), Optional.empty());
    }

    public static RawRangeQuery withNoBounds() {
        return new RawRangeQuery(Optional.empty(), Optional.empty());
    }

    public Optional<Bytes> getLowerBound() {
        return lower;
    }

    public Optional<Bytes> getUpperBound() {
        return upper;
    }


Compatibility, Deprecation, and Migration Plan

  • Since this is a completely new set of APIs, no backward compatibility concerns are anticipated. 
  • Since nothing is deprecated in this KIP, users have no need to migrate unless they want to.

Rejected Alternatives

Initially, we proposed to add also a RawRangeQuery  typed with <KeyValueIterator<Bytes, byte[]> . After looking at the code, it seems that it doesn't provide us with many benefits (we save on one cast) which doesn't justify the cost of adding an extra query to the public interface.