Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
/**
  * The request object for Interactive Queries.
  * This is an immutable builder class for passing all required and
  * optional arguments for querying a state store in Kafka Streams.
  * <p>
  * @param <R> The type of the query result.
  */
@Evolving
public class InteractiveQueryRequest<R> {

  /**
    * First required argument to specify the name of the store to query
    */
  public static InStore inStore(final String name);

  public static class InStore {

    /**
      * Second required argument to provide the query to execute.
      */
    public <R> InteractiveQueryRequest<R> withQuery(final Query<R> query);
  } 

  /**
    * Optionally bound the current position of the state store
    * with respect to the input topics that feed it. In conjunction
    * with {@link InteractiveQueryResult#getPosition}, this can be
    * used to achieve a good balance between consistency and
    * availability in which repeated queries are guaranteed to
    * advance in time while allowing reads to be served from any
    * replica that is caught up to that caller's prior observations.
    * <p>
    * Note that the set of offsets provided in the bound does not determine
    * the partitions to query. For that, see {@link withPartitionsToQuery}.
    * Unrelated offsets will be ignored, and missing offsets will be treated
    * as indicating "no bound". 
    */
  public InteractiveQueryRequest<R> withPositionBound(PositionPositionBound positionBound);

  

  /**
    * Optionally specify that the querypartitions willto runinclude onin anythe store,query.
    * whetherIf activeomitted, standby,the ordefault restoring.is Thisto dataquery mayall be
locally    * arbitrarily stale, but displays the best possible availability.available partitions
    */
   publicpublic InteractiveQueryRequest<R> withNoPositionBoundwithPartitions(Set<Integer> partitions);

  /**
    * OptionallyQuery all specifylocally theavailable partitions
 to include in the query.
    * If omitted, the default is to query all locally available partitions
    */
  public InteractiveQueryRequest<R> withPartitions(Set<Integer> partitions);

  /**
    * Query all locally available partitions
    */
  public InteractiveQueryRequest<R>  */
  public InteractiveQueryRequest<R> withAllPartitions(); 


  /**
    * Instruct Streams to collect detailed information during query
    * execution, for example, which stores handled the query, how
    * long they took, etc.
    */
  public InteractiveQueryRequest<R> enableExecutionInfo();

  // Getters are also proposed to retrieve the request parameters

  public String getStoreName();
  public Query<R> getQuery();
  public PositionPositionBound getPositionBound();
  public boolean executionInfoEnabled() 

  /**
    * Optional.of(empty set) indicates that no partitions will be fetched
    * Optional.of(non-empty set) indicate the specific partitions that will be fetched (if locally available)
    */
  public Set<Integer> getPartitions();

  /**
    * indicates that all locally available partitions will be fetched
    */
  public boolean isAllPartitions();

}

...

This is the essence of the proposal. Rather than modifying each store interface to allow new kinds of queries, we introduce a generic capability of stores to execute query objects. This allows stores to choose whether they accept or reject queries of a given type, the introduction of new queries, etc.

Note that we are proposing a method with multiple arguments instead of a "parameters" object because this is an interface that users should implement to provide their own state stores. Following the strategy of adding/deprecating new method overloads allows us to modify this API over time by adding new methods with default  implementations and control what happens in the default case (falling back to another method or returning an error), whereas if we had a "parameters" object, Streams would have no control or knowledge over whether stores use the new members or not, which makes API evolution more difficult to manage.

Code Block
public interface StateStore {
  ...

  /**
Code Block
public interface StateStore {
  ...

  /**
    * Execute a query. Returns a QueryResult containing either result data or
    * a failure.
    * <p>
Execute a query. Returns a QueryResult containing either result data or
    * a failure.
    * <p>
    * If the store doesn't know how to handle the given query, the result
    * will be a {@link FailureReason#UNKNOWN_QUERY_TYPE}.
    * If the store couldn't satisfy the given position bound, the result
    * will be a {@link FailureReason#NOT_UP_TO_BOUND}.
    * @param query The query to execute
    * @param offsetBound The position the store must be at or past
    * @param collectExecutionInfo Whether the store should collect detailed execution info for the query
    * @param <R> The result type
    */
  @Evolving
  default <R> QueryResult<R> query(Query<R> query,
                                   PositionPositionBound positionBound,
                                   boolean collectExecutionInfo) {
    // If a store doesn't implement a query handler, then all queries are unknown.
    return QueryResult.forUnknownQueryType(query, this);
  }
 
  ...
}

...

Code Block
@Evolving
public class KeyQuery<K, V> implements Query<V> {
  // static factory to create a new KeyQuery, given a key
  public static <K, V> KeyQuery<K, V> withKey(final K key);

  // getter for the key
  public K getKey();
}

// example usage in IQ:

Integer key = 1;

// note that "mystore" is a KeyValueStore<Integer, ValueAndTimestamp<Integer>>,
// hence the result type        
InteractiveQueryRequest<ValueAndTimestamp<Integer>> query =
  inStore("mystore")
    .withQuery(KeyQuery.withKey(key));
     
// run the query
InteractiveQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(query);

// In this example, it doesn't matter which partition ran the query, so we just
// grab the result from the partition that returned something
// (this is what IQ currently does under the covers).

Integer value = result.getOnlyPartitionResult().getResult().value();

We propose to add KeyQuery as part of the implementation of this KIP, in order to have at least one query available to flesh out tests, etc., for the framework. Other queries are deferred to later KIPs.

Example query: RawScanQuery

This example demonstrates two variations on the first example:

  1. The ability to define queries handling "raw" binary data for keys and values
  2. The ability for a query to return multiple results (in this case, an iterator)

I'm only bundling those for brevity. We can also have typed, iterable queries and raw single-record queries.

Note this query is purely an example of what is possible. It will not be added as part of this KIP.

Code Block
public class RawScanQuery implements Query<KeyValueIterator<Bytes, byte[]>> {
    private RawScanQuery() { }

    public static RawScanQuery scan() {
        return new RawScanQuery();
    }
}

// example usage

// since we're handling raw data, we're going to need the serdes
InteractiveQuerySerdes<Integer, ValueAndTimestamp<Integer>> serdes =
  kafkaStreams.serdesForStore("mystore");

// run the "scan" query
InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
  kafkaStreams.query(inStore("mystore").withQuery(RawScanQuery.scan()));

// This time, we'll get results from all locally available partitions.
Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =
  scanResult.getPartitionResults();


// for this example, we'll just collate all the partitions' iterators
// together and print their data

List<KeyValueIterator<Bytes, byte[]>> iterators =
  partitionResults
    .values()
    .stream()
    .map(QueryResult::getResult)
    .collect(Collectors.toList());

// Using an example convenience method we could add to collate iterators' data
try (CloseableIterator<KeyValue<Bytes, byte[]>> collated = Iterators.collate(collect)) {
  while(collate.hasNext()) {
    KeyValue<Bytes, byte[]> next = collated.next();
    System.out.println(
      "|||" +
        " " + serdes.keyFrom(next.key.get()) +
        " " + serdes.valueFrom(next.value)
    );
  }
}

QueryResult

This is a container for a single partition's query result.

 .withQuery(KeyQuery.withKey(key));
     
// run the query
InteractiveQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(query);

// In this example, it doesn't matter which partition ran the query, so we just
// grab the result from the partition that returned something
// (this is what IQ currently does under the covers).

Integer value = result.getOnlyPartitionResult().getResult().value();

We propose to add KeyQuery as part of the implementation of this KIP, in order to have at least one query available to flesh out tests, etc., for the framework. Other queries are deferred to later KIPs.

Example query: RawScanQuery

This example demonstrates two variations on the first example:

  1. The ability to define queries handling "raw" binary data for keys and values
  2. The ability for a query to return multiple results (in this case, an iterator)

I'm only bundling those for brevity. We can also have typed, iterable queries and raw single-record queries.

Note this query is purely an example of what is possible. It will not be added as part of this KIP.

Code Block
public class RawScanQuery implements Query<KeyValueIterator<Bytes, byte[]>> {
    private RawScanQuery() { }

    public static RawScanQuery scan() {
        return new RawScanQuery();
    }
}

// example usage

// since we're handling raw data, we're going to need the serdes
InteractiveQuerySerdes<Integer, ValueAndTimestamp<Integer>> serdes =
  kafkaStreams.serdesForStore("mystore");

// run the "scan" query
InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
  kafkaStreams.query(inStore("mystore").withQuery(RawScanQuery.scan()));

// This time, we'll get results from all locally available partitions.
Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =
  scanResult.getPartitionResults();


// for this example, we'll just collate all the partitions' iterators
// together and print their data

List<KeyValueIterator<Bytes, byte[]>> iterators =
  partitionResults
    .values()
    .stream()
    .map(QueryResult::getResult)
    .collect(Collectors.toList());

// Using an example convenience method we could add to collate iterators' data
try (CloseableIterator<KeyValue<Bytes, byte[]>> collated = Iterators.collate(collect)) {
  while(collate.hasNext()) {
    KeyValue<Bytes, byte[]> next = collated.next();
    System.out.println(
      "|||" +
        " " + serdes.keyFrom(next.key.get()) +
        " " + serdes.valueFrom(next.value)
    );
  }
}

QueryResult

This is a container for a single partition's query result.

Code Block
@Evolving
public class QueryResult<R> {
  // wraps a successful result
  public static <R> QueryResult<R> forResult(R result);

  // returns a failed query result because the store didn't know how to handle the query.
  public static <R> QueryResult<R> forUnknownQueryType(Query<R> query, StateStore store);

  // returns a failed query result because the partition wasn't caught up to the desired bound.
  public static <R> QueryResult<R> notUpToBound(Position currentPosition, Position bound);

  // returns a failed query result because caller requested a "latest" bound, but the task was
  // not active and running.
Code Block
@Evolving
public class QueryResult<R> {
  // wraps a successful result
  public static <R> QueryResult<R> forResultnotActive(RString resultcurrentState);

  // returnsUsed aby failedstate querystores resultthat becauseneed theto storedelegate didn'tto knowanother howstore to handlerun thea query.
  public static <R> QueryResult<R> forUnknownQueryType(Query<R> query, StateStore store);
 and then
  // returnstranslate athe failedresults. queryDoes resultnot becausechange the partitionexecution wasn'tinfo caughtor upany to the desired boundother metadata.
  public static<NewR> <R>QueryResult<NewR> QueryResult<R> notUpToBoundswapResult(Position currentPosition, Position boundNewR newTypedResult);

  // returns a failed query result because caller requested a "latest" bound, but the task was If requested, stores should record
  // helpful information, such as their own class, how they executed the query,
  // and notthe activetime andthey runningtook.
  public static <R> QueryResult<R> notActivevoid addExecutionInfo(String currentStateexecutionInfo);

  // Used by state stores that need to delegate to another store to run a query and then...
}

Position

A class representing a processing state position in terms of its inputs: a vector or (topic, partition, offset) components.

Code Block
@Evolving
public interface Position {

  // translateCreate thea results.new DoesPosition notfrom changea themap executionof infotopic or-> anypartition other-> metadata.offset
  public <NewR>static QueryResult<NewR>Position swapResult(NewR newTypedResultfromMap(Map<String, Map<Integer, Long>> map);

  // IfCreate a requestednew, storesempty should recordPosition
  //static helpful information, such as their own class, how they executed the query,
  // and the time they took.
  public void addExecutionInfo(String executionInfo);

...
}

Position

A class representing (or bounding) a processing state position in terms of its inputs: a vector or (topic, partition, offset) components. When used as a bound, it can also represent special "unbounded" or "latest" bounds.

Code Block
@Evolving
public interface Position {

  // Create a new Position from a map of topic -> partition -> offset
  static Position fromMap(Map<String, Map<Integer, Long>> mapPosition emptyPosition();

  // Return a new position based on the current one, with the given component added
  Position withComponent(String topic, int partition, long offset);

  // Merge all the components of this position with all the components of the other
  // position and return the result in a new Position
  Position merge(Position other);

  // Get the set of topics included in this Position
  Set<String> getTopics();

  // CreateGiven a new Position bound representing only the latest state
  static Position latestMarker();

  // Create a new Position bound representing "no bound"
  static Position unbounded();

  // Create a new, empty Position
  static Position emptyPosition();topic, get the partition -> offset pairs included in this Position
  Map<Integer, Long> getBound(String topic);
}

PositionBound

A class bounding the processing state Position during queries. This can be used to specify that a query should fail if the locally available partition isn't caught up to the specified bound. "Unbounded" places no restrictions on the current location of the partition, and "latest" indicates that only a running active replica should reply to the query.

Code Block
@Evolving
public interface PositionBound {

  // ReturnCreate a new positionPosition basedbound onrepresenting the current one, with only the givenlatest componentstate
 added
 static PositionPositionBound withComponent(String topic, int partition, long offsetlatest();

  // MergeCreate alla thenew componentsPosition ofbound thisrepresenting position with all the components of the other
  // position and return the result in"no bound"
  static PositionBound unbounded();

  // Create a new, empty Position
  Positionstatic PositionBound mergeat(Position otherposition);

  // Check whether this is a "latest" Position bound
  boolean isLatestMarkerisLatest();

  // Check whether this is an "unbounded" Position bound
  boolean isUnbounded();

  // Get the setPosition of topics included in this Position
  Set<String> getTopics();

  // Given a topic, get the partition -> offset pairs included in this Position
  Map<Integer, Long> getBound(String topic(if it's not unbounded or latest)
  Position position();
}



Compatibility, Deprecation, and Migration Plan

...