Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


The RangeQuery class will be used for both range and scan queries. A scan is performed when no lower and no upper bound is specified, i.e. if the Optional is empty.

Code Block
public class RangeQuery<K, V> implements Query<V> {
    private final Optional<K> lower;
    private final Optional<K> upper;

    private RangeQuery(final Optional<K> lower, final Optional<K> upper) {
        this.lower = lower;
        this.upper = upper;

    public static <K, V> RangeQuery<K, V> withRange(final Optional<K>K lower, final Optional<K>K upper) {
        return new RangeQuery<>(Optional.of(lower), Optional.of(upper));

    public static <K, V> RangeQuery<K, V> withUpperBound(final K upper) {
        return new RangeQuery<>(Optional.empty(), Optional.of(upper));

    public static <K, V> RangeQuery<K, V> withLowerBound(final K lower) {
        return new RangeQuery<>(Optional.of(lower), Optional.empty());

    public static <K, V> RangeQuery<K, V> withNoBounds() {
        return new RangeQuery<>(Optional.empty(), Optional.empty());

    public Optional<K> getLowerBound() {
        return lower;

    public Optional<K> getUpperBound() {
        return upper;
// ======================================
// Range query example usage in IQv2:
Integer key1 = 1;
Integer key2 = 2;

// create the query parameters
final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =

StateQueryRequest<KeyValueIterator<Bytes, byte[]>> query =
// run the query
StateQueryResult<KeyValueIterator<Bytes, byte[]>> result = kafkaStreams.query(query);
// Get the results from all partitions.
        final Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =
        for (final Entry<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> entry : partitionResults.entrySet()) {
            try (final KeyValueIterator<Bytes, byte[]> keyValueIterator = entry.getValue().getResult()) {
                while (keyValueIterator.hasNext()) {
                    final KeyValue<Bytes, byte[]> next =;
                    Integer key = serdes.keyFrom(next.key.get());
					Integer value = serdes.valueFrom(next.value));

// ======================================
// Scan query example usage in IQv2:

// create the query parameters
StateQueryRequest<KeyValueIterator<Bytes, byte[]>> query =
// run the query
StateQueryResult<KeyValueIterator<Bytes, byte[]>> result = kafkaStreams.query(query);

There will also be an implementation for a "raw" version of the RangeQuery, which simply takes the key as a byte array and returns the value as a byte array.

Code Block
public class RawRangeQuery implements Query<KeyValueIterator<Bytes, byte[]>> {

    private final Optional<Bytes> lower;
    private final Optional<Bytes> upper;

    private RawRangeQuery(final Optional<Bytes> lower, final Optional<Bytes> upper) {
        this.lower = lower;
        this.upper = upper;

    public static RawRangeQuery withRange(final Bytes lower, final Bytes upper) {
        return new RawRangeQuery(Optional.of(lower), Optional.of(upper));

    public static RawRangeQuery withUpperBound(final Bytes upper) {
        return new RawRangeQuery(Optional.empty(), Optional.of(upper)),;

    public static RawRangeQuery withLowerBound(final Bytes lower) {
        return new RawRangeQuery(Optional.of(lower), Optional.empty());

    public static RawRangeQuery withNoBounds() {
        return new RawRangeQuery(Optional.empty(), Optional.empty()));;

    public Optional<Bytes> getLowerBound() {
// run the query
StateQueryResult<KeyValueIterator<Bytes, byte[]>> result = kafkaStreams.query(query);

  return lower;

    public Optional<Bytes> getUpperBound() {
        return upper;

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives


  • Since this is a completely new set of APIs, no backward compatibility concerns are anticipated. 
  • Since nothing is deprecated in this KIP, users have no need to migrate unless they want to.