Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: change InteractiveQueryX to StateQueryX

...

Code Block
languagejava
titleKafkaStreams
public class KafkaStreams implements AutoCloseable {
  ...

  /**
    * Run an interactive query against a state store.
    * <p>
    * This method allows callers outside of the Streams runtime to
    * access the internal state of stateful processors. See
    * https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
    * for more information.
    * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link KafkaStreams#start()}
    *                                    and then retry this call.
    * @throws UnknownStateStoreException If the specified store name does not exist in the topology.
    * @throws InvalidStateStorePartitionException If the specified partition does not exist.
    * /
  @Evolving
  public <R> InteractiveQueryResult<R>StateQueryResult<R> query(InteractiveQueryRequest<R>StateQueryRequest<R> request);

  ...
}

...


StateQueryRequest

This is the main request object for IQv2. It contains all of the information required to execute the query. Note that, although this class is similar to the IQv1 request object StoreQueryParameters , we are proposing a new class to avoid unnecessary coupling between the old and new APIs' design goals.

...

Code Block
/**
  * The request object for Interactive Queries.
  * This is an immutable builder class for passing all required and
  * optional arguments for querying a state store in Kafka Streams.
  * <p>
  * @param <R> The type of the query result.
  */
@Evolving
public class InteractiveQueryRequest<R>StateQueryRequest<R> {

  /**
    * First required argument to specify the name of the store to query
    */
  public static InStore inStore(final String name);

  public static class InStore {

    /**
      * Second required argument to provide the query to execute.
      */
    public <R> InteractiveQueryRequest<R>StateQueryRequest<R> withQuery(final Query<R> query);
  } 

  /**
    * Optionally bound the current position of the state store
    * with respect to the input topics that feed it. In conjunction
    * with {@link InteractiveQueryResult#getPositionStateQueryResult#getPosition}, this can be
    * used to achieve a good balance between consistency and
    * availability in which repeated queries are guaranteed to
    * advance in time while allowing reads to be served from any
    * replica that is caught up to that caller's prior observations.
    * <p>
    * Note that the set of offsets provided in the bound does not determine
    * the partitions to query. For that, see {@link withPartitionsToQuery}.
    * Unrelated offsets will be ignored, and missing offsets will be treated
    * as indicating "no bound". 
    */
  public InteractiveQueryRequest<R>StateQueryRequest<R> withPositionBound(PositionBound positionBound);

  /**
    * Optionally specify the partitions to include in the query.
    * If omitted, the default is to query all locally available partitions
    */
  public InteractiveQueryRequest<R>StateQueryRequest<R> withPartitions(Set<Integer> partitions);

  /**
    * Query all locally available partitions
    */
  public InteractiveQueryRequest<R>StateQueryRequest<R> withAllPartitions(); 


  /**
    * Instruct Streams to collect detailed information during query
    * execution, for example, which stores handled the query, how
    * long they took, etc.
    */
  public InteractiveQueryRequest<R>StateQueryRequest<R> enableExecutionInfo();

  // Getters are also proposed to retrieve the request parameters

  public String getStoreName();
  public Query<R> getQuery();
  public PositionBound getPositionBound();
  public boolean executionInfoEnabled() 

  /**
    * empty set indicates that no partitions will be fetched
    * non-empty set indicate the specific partitions that will be fetched (if locally available)
    * throws UnsupportedOperationException if the request is to all partitions (isAllPartitions() == true)
    */
  public Set<Integer> getPartitions();

  /**
    * indicates that all locally available partitions will be fetched
    */
  public boolean isAllPartitions();

}

...

StateQueryResult

This is the main response object for IQv2. It wraps the individual results, as well as providing a vehicle to deliver metadata relating to the result as a whole.

Code Block
/**
  * The response object for interactive queries.
  * It wraps the individual results, as well as providing a
  * vehicle to deliver metadata relating to the result as a whole.
  * <p>
  * @param <R> The type of the query result. 
  */
@Evolving 
public class InteractiveQueryResult<R>StateQueryResult<R> {
  /**
    * Constructor. Used by Kafka Streams, and may be useful for
    * tests as well.
    */
  public InteractiveQueryResultStateQueryResult(Map<Integer /*partition*/, QueryResult<R>> partitionResults);

  /**
    * The query's result for each partition that executed the query.
    */
  public Map<Integer /*partition*/, QueryResult<R>> getPartitionResults();

  /**
    * Asserts that only one partition returns a result and extract the result.
    * Useful with queries that expect a single result.
    */
  public QueryResult<R> getOnlyPartitionResult()


  /**
    * The position of the state store at the moment it executed the
    * query. In conjunction
    * with {@link InteractiveQueryRequest#withPartitionBoundStateQueryRequest#withPartitionBound}, this can be
    * used to achieve a good balance between consistency and
    * availability in which repeated queries are guaranteed to
    * advance in time while allowing reads to be served from any
    * replica that is caught up to that caller's prior observations.
    */ 
  public Position getPosition();
}

...

Code Block
@Evolving
public class KeyQuery<K, V> implements Query<V> {
  // static factory to create a new KeyQuery, given a key
  public static <K, V> KeyQuery<K, V> withKey(final K key);

  // getter for the key
  public K getKey();
}

// ======================================
// example usage in IQv2:

Integer key = 1;

// note that "mystore" is a KeyValueStore<Integer, ValueAndTimestamp<Integer>>,
// hence the result type        
InteractiveQueryRequest<ValueAndTimestamp<Integer>>StateQueryRequest<ValueAndTimestamp<Integer>> query =
  inStore("mystore")
    .withQuery(KeyQuery.withKey(key));
     
// run the query
InteractiveQueryResult<ValueAndTimestamp<Integer>>StateQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(query);

// In this example, it doesn't matter which partition ran the query, so we just
// grab the result from the partition that returned something
// (this is what IQ currently does under the covers).

Integer value = result.getOnlyPartitionResult().getResult().value();

// ======================================
// For comparison, here is how the query would look in current IQ:
// (note, this glosses over many of the problems described above.
//  the example is here to illustrate the different ergonomics
//  of the two APIs.)

// create the query parameters
StoreQueryParameters<ReadOnlyKeyValueStore<Integer, ValueAndTimestamp<Integer>>> storeQueryParameters =
  StoreQueryParameters.fromNameAndType(
    "mystore", 
    QueryableStoreTypes.timestampedKeyValueStore()
  );

// get a store handle to run the query
ReadOnlyKeyValueStore<Integer, ValueAndTimestamp<Integer>> store =
  kafkaStreams.store(storeQueryParameters);

// query the store
Integer value1 = store.get(key).value(); 

...

Code Block
public class RawScanQuery implements Query<KeyValueIterator<Bytes, byte[]>> {
    private RawScanQuery() { }

    public static RawScanQuery scan() {
        return new RawScanQuery();
    }
}

// example usage

// since we're handling raw data, we're going to need the serdes
InteractiveQuerySerdes<IntegerStateQuerySerdes<Integer, ValueAndTimestamp<Integer>> serdes =
  kafkaStreams.serdesForStore("mystore");

// run the "scan" query
InteractiveQueryResult<KeyValueIterator<BytesStateQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
  kafkaStreams.query(inStore("mystore").withQuery(RawScanQuery.scan()));

// This time, we'll get results from all locally available partitions.
Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =
  scanResult.getPartitionResults();


// for this example, we'll just collate all the partitions' iterators
// together and print their data

List<KeyValueIterator<Bytes, byte[]>> iterators =
  partitionResults
    .values()
    .stream()
    .map(QueryResult::getResult)
    .collect(Collectors.toList());

// Using an example convenience method we could add to collate iterators' data
try (CloseableIterator<KeyValue<Bytes, byte[]>> collated = Iterators.collate(collect)) {
  while(collate.hasNext()) {
    KeyValue<Bytes, byte[]> next = collated.next();
    System.out.println(
      "|||" +
        " " + serdes.keyFrom(next.key.get()) +
        " " + serdes.valueFrom(next.value)
    );
  }
}

...

Code Block
 
  /**
    * Get a reference to the serdes used internally in a state store
    * for use with interactive queries. While many queries already
    * accept Java-typed keys and also return typed results, some
    * queries may need to handle the raw binary data stored in StateStores,
    * in which case, this method can provide the serdes needed to interpret
    * that data.
    * /
  @Evolving
  public <K, V> InteractiveQuerySerdes<KStateQuerySerdes<K, V> serdesForStore(String storeName); 

...