Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public interface StateStore {
  ...

  /**
    * Execute a query. Returns a QueryResult containing either result data or
    * a failure.
    * <p>
    * If the store doesn't know how to handle the given query, the result
    * will be a {@link FailureReason#UNKNOWN_QUERY_TYPE}.
    * If the store couldn't satisfy the given position bound, the result
    * will be a {@link FailureReason#NOT_UP_TO_BOUND}.
    * @param query The query to execute
    * @param offsetBound The position the store must be at or past
    * @param <R> The result type
    */
  @Evolving
  default <R> QueryResult<R> query(StoreQuery<R>Query<R> query, Position positionBound) {
    // If a store doesn't implement a query handler, then all queries are unknown.
    return QueryResult.forUnknownQueryType(query, this);
  }
 
  ...
}

...

Query

This is a simple container for executing a query on a specific partition of a state store. As we are now getting into the minor details, I'll omit JavaDocs that seem obvious.the interface that all queries must implement. Part of what I'm going for here is to place as few restrictions as possible on what a "query" or its "result" is, so the only thing in this interface is a generic type indicating the result type, which lets Streams return a fully typed response without predetermining the response type itself.

Code Block
public interface Query<R> { }

Using this interface, store implementers can create any query they can think of and return any type they like, from a single value to an iterator, even a future. While writing the POC, I implemented three queries for KeyValue stores, which I'll include here as examples:

Example Query: KeyQuery

This query implements the functionality of the current KeyValueStore#get(key)  method:

Code Block
public class KeyQuery<K, V> implements Query<V> {
  // static factory to create a new KeyQuery, given a key
  public static <K, V> KeyQuery<K, V> withKey(final K key);

  // getter for the key
  public K getKey();
}

// example usage in IQ:

Integer key = 1;

// note that "mystore" is a KeyValueStore<Integer, ValueAndTimestamp<Integer>>,
// hence the result type        
InteractiveQueryRequest<ValueAndTimestamp<Integer>> query =
  inStore("mystore")
    .withQuery(KeyQuery.withKey(key));
     
// run the query
InteractiveQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(query);

// In this example, it doesn't matter which partition ran the query, so we just
// grab the result from the first partition that returned something
// (this is what IQ currently does under the covers).

Integer value = result.getFirstPartitionResult().getResult().value();

We propose to add KeyQuery as part of the implementation of this KIP, in order to have at least one query available to flesh out tests, etc., for the framework. Other queries are deferred to later KIPs.

Example query: RawScanQuery

This example demonstrates two variations on the first example:

  1. The ability to define queries handling "raw" binary data for keys and values
  2. The ability for a query to return multiple results (in this case, an iterator)

I'm only bundling those for brevity. We can also have typed, iterable queries and raw single-record queries.

Note this query is purely an example of what is possible. It will not be added as part of this KIP.

Code Block
public class RawScanQuery implements Query<KeyValueIterator<Bytes, byte[]>> {
    private RawScanQuery() { }

    public static RawScanQuery scan() {
        return new RawScanQuery();
    }
}

// example usage

// since we're handling raw data, we're going to need the serdes
InteractiveQuerySerdes<Integer, ValueAndTimestamp<Integer>> serdes =
  kafkaStreams.serdesForStore("mystore");

// run the "scan" query
InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
  kafkaStreams.query(inStore("mystore").withQuery(RawScanQuery.scan()));

// This time, we'll get results from all locally available partitions.
Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =
  scanResult.getPartitionResults();


// for this example, we'll just collate all the partitions' iterators
// together and print their data

List<KeyValueIterator<Bytes, byte[]>> iterators =
  partitionResults
    .values()
    .stream()
    .map(QueryResult::getResult)
    .collect(Collectors.toList());

// Using an example convenience method we could add to collate iterators' data
try (CloseableIterator<KeyValue<Bytes, byte[]>> collated = Iterators.collate(collect)) {
  while(collate.hasNext()) {
    KeyValue<Bytes, byte[]> next = collated.next();
    System.out.println(
      "|||" +
        " " + serdes.keyFrom(next.key.get()) +
        " " + serdes.valueFrom(next.value)
    );
  }
Code Block
public class StoreQuery<R> {
  public static <R> StoreQuery<R> unbounded(Query<R> query);
  public static <R> StoreQuery<R> latestBounded(Query<R> query); 
  public static <R> StoreQuery<R> bounded(Query<R> query, Offsets bound);  

  // allow collection of detailed execution information
  public StoreQuery<R> enableExecutionInfo();

  public Query<R> getQuery();       // not nullable
  public boolean  isLatestBounded();
  public boolean  isUnbounded(); 
  public Offsets  getBound();        // nullable 
  public boolean  isExecutionInfoEnabled();
}

QueryResult

This is a container for a single partition's query result.

...