Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: TBD

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-13479

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
titleRangeQuery.java
@Evolving
public class RangeQuery<KRangeQuery<KeyValueIterator<K, V>V>> implements Query<KeyValueIterator<K, Query<V>V>> {      
    private final Optional<K> lower;
    private final Optional<K> upper;

    private RangeQuery(final Optional<K> lower, final Optional<K> upper) {
        this.lower = lower;
        this.upper = upper;
    }

    public static <K, V> RangeQuery<K, V> withRange(final K lower, final K upper) {
        return new RangeQuery<>(Optional.of(lower), Optional.of(upper));
    }

    public static <K, V> RangeQuery<K, V> withUpperBound(final K upper) {
        return new RangeQuery<>(Optional.empty(), Optional.of(upper));
    }

    public static <K, V> RangeQuery<K, V> withLowerBound(final K lower) {
        return new RangeQuery<>(Optional.of(lower), Optional.empty());
    }

    public static <K, V> RangeQuery<K, V> withNoBounds() {
        return new RangeQuery<>(Optional.empty(), Optional.empty());
    }

    public Optional<K> getLowerBound() {
        return lower;
    }

    public Optional<K> getUpperBound() {
        return upper;
    }
}
 
// ======================================
// Range query example usage in IQv2:
 
Integer key1 = 1;
Integer key2 = 2;

// create the query parameters
final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
      kafkaStreams.serdesForStore("mystore")

StateQueryRequest<KeyValueIterator<Bytes, byte[]>> query =
  inStore("mystore")
  .withQuery(RangeQuery.withRange(Bytes.wrap(serdes.rawKey(key1)),
                                  Bytes.wrap(serdes.rawKey(key2))));
      
// run the query
StateQueryResult<KeyValueIterator<Bytes, byte[]>> result = kafkaStreams.query(query);
 
// Get the results from all partitions.
        final Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =
            rangeResult.getPartitionResults();
        for (final Entry<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> entry : partitionResults.entrySet()) {
            try (final KeyValueIterator<Bytes, byte[]> keyValueIterator = entry.getValue().getResult()) {
                while (keyValueIterator.hasNext()) {
                    final KeyValue<Bytes, byte[]> next = keyValueIterator.next();
                    Integer key = serdes.keyFrom(next.key.get());
					Integer value = serdes.valueFrom(next.value));
                }
            }
        } 


// ======================================
// Scan query example usage in IQv2:

// create the query parameters
StateQueryRequest<KeyValueIterator<Bytes, byte[]>> query =
  inStore("mystore")
  .withQuery(RangeQuery.withNoBounds());
      
// run the query
StateQueryResult<KeyValueIterator<Bytes, byte[]>> result = kafkaStreams.query(query);
  

...