You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »


Status

Current state: "Under Discussion"

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Provide an implementation of the Query interface, introduced in KIP-796: Interactive Query v2 , to support range and scan queries

Proposed Changes

The RangeQuery class will be used for both range and scan queries. A scan is performed when no lower and no upper bound is specified, i.e. if the Optional is empty.

@Evolving
public class RangeQuery<K, V> implements Query<V> {

    private final Optional<K> lower;
    private final Optional<K> upper;

    private RangeQuery(final Optional<K> lower, final Optional<K> upper) {
        this.lower = lower;
        this.upper = upper;
    }

    public static <K, V> RangeQuery<K, V> withRange(final Optional<K> lower, final Optional<K> upper) {
        return new RangeQuery<>(lower, upper);
    }

    public Optional<K> getLowerBound() {
        return lower;
    }

    public Optional<K> getUpperBound() {
        return upper;
    }
}
 
// ======================================
// Range query example usage in IQv2:
 
Integer key1 = 1;
Integer key2 = 2;

// create the query parameters
final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
      kafkaStreams.serdesForStore("mystore")

StateQueryRequest<KeyValueIterator<Bytes, byte[]>> query =
  inStore("mystore")
  .withQuery(RangeQuery.withRange(Optional.of(Bytes.wrap(serdes.rawKey(key1))),
                                  Optional.of(Bytes.wrap(serdes.rawKey(key2)))));
      
// run the query
StateQueryResult<KeyValueIterator<Bytes, byte[]>> result = kafkaStreams.query(query);
 
// Get the results from all partitions.
        final Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =
            rangeResult.getPartitionResults();
        for (final Entry<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> entry : partitionResults.entrySet()) {
            try (final KeyValueIterator<Bytes, byte[]> keyValueIterator = entry.getValue().getResult()) {
                while (keyValueIterator.hasNext()) {
                    final KeyValue<Bytes, byte[]> next = keyValueIterator.next();
                    Integer key = serdes.keyFrom(next.key.get());
					Integer value = serdes.valueFrom(next.value));
                }
            }
        } 


// ======================================
// Scan query example usage in IQv2:

// create the query parameters
StateQueryRequest<KeyValueIterator<Bytes, byte[]>> query =
  inStore("mystore")
  .withQuery(RangeQuery.withRange(Optional.empty())),
                                  Optional.empty())));
      
// run the query
StateQueryResult<KeyValueIterator<Bytes, byte[]>> result = kafkaStreams.query(query);
  


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels