Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
/**
  * The request object for Interactive Queries.
  * This is an immutable builder class for passing all required and
  * optional arguments for querying a state store in Kafka Streams.
  * <p>
  * @param <R> The type of the query result.
  */
@Evolving
public class InteractiveQueryRequest<R> {

  /**
    * First required argument to specify the name of the store to query
    */
  public static InStore inStore(final String name);

  public static class InStore {

    /**
      * Second required argument to provide the query to execute.
      */
    public <R> InteractiveQueryRequest<R> withQuery(final Query<R> query);
  } 

  /**
    * Optionally bound the current position of the state store
    * with respect to the input topics that feed it. In conjunction
    * with {@link InteractiveQueryResult#getPosition}, this can be
    * used to achieve a good balance between consistency and
    * availability in which repeated queries are guaranteed to
    * advance in time while allowing reads to be served from any
    * replica that is caught up to that caller's prior observations.
    * <p>
    * Note that the set of offsets provided in the bound does not determine
    * the partitions to query. For that, see {@link withPartitionsToQuery}.
    * Unrelated offsets will be ignored, and missing offsets will be treated
    * as indicating "no bound". 
    */
  public InteractiveQueryRequest<R> withPositionBound(OffsetsPosition positionBound);

  

  /**
    * Optionally boundspecify that the query towill only executerun on active running tasks.any store,
    * Thiswhether isactive, thestandby, freshestor andrestoring. mostThis consistentdata possible result, but doesmay be
    * notarbitrarily providestale, highbut availability. This is displays the defaultbest if no bound is
    * specifiedpossible availability.
    */
  public InteractiveQueryRequest<R> withLatestPositionBoundwithNoPositionBound(); 

  /**
    * Optionally specify that the querypartitions willto runinclude onin anythe store,query.
    * whetherIf activeomitted, standby,the ordefault restoring.is Thisto dataquery mayall be
locally    * arbitrarily stale, but displays the best possible availability.available partitions
    */
  public public InteractiveQueryRequest<R> withNoPositionBoundwithPartitions(Set<Integer> partitions);

  /**
    * OptionallyQuery all specifylocally theavailable partitions
 to include in the query.
    * If omitted, the default is to query all locally available partitions
    * */
  public InteractiveQueryRequest<R> withPartitionsToQuerywithAllPartitions(Set<Integer> partitions); 


   // Getters are also proposed to retrieve the request parameters

  public String getStoreName();     // not nullable
  public Query<R> getQuery();       // not nullable
  public booleanPosition isLatestBoundedgetPositionBound();

  public boolean isUnbounded(); 
  public Offsets getBound();        // nullable
}

InteractiveQueryResult

/**
    * Optional.of(empty set) indicates that no partitions will be fetched
    * Optional.of(non-empty set) indicate the specific partitions that will be fetched (if locally available)
    */
  public Set<Integer> getPartitions();

  /**
    * indicates that all locally available partitions will be fetched
    */
  public boolean isAllPartitions();
}

InteractiveQueryResult

This is the main response object for IQv2. It wraps the individual results, as well as providing a vehicle to deliver metadata relating to the result as a whole.

Code Block
/**
  * The response object for interactive queries.
  * It wraps the individual results, as well as providing a
  * vehicle to deliver metadata relating to the result as a whole.
  * <p>
  * @param <R> The type of the query result. 
  */
@Evolving 
public class InteractiveQueryResult<R> {
  /**
    * Constructor. Used by Kafka Streams, and may be useful for
    * tests as well.
    */
  public InteractiveQueryResult(Map<Integer /*partition*/, QueryResult<R>> partitionResults);

  /**
    * The query's result for each partition that executed the query.
    */
  public Map<Integer /*partition*/, QueryResult<R>> getPartitionResults();

  /**
    * TheAsserts positionthat ofonly theone statepartition storereturns ata theresult momentand itextract executedthe theresult.
    * query.Useful Inwith conjunction
queries that expect a single result.
    */
  public QueryResult<R> getOnlyPartitionResult()


  /**
    * The position of the state store at the moment it executed the
    * query. In conjunction
    * with {@link InteractiveQueryRequest#withPartitionBound}, this can be
    * used to achieve a good balance between consistency and
    * availability in which repeated queries are guaranteed to
    * advance in time while allowing reads to be served from any
    * replica that is caught up to that caller's prior observations.
    */ 
  public OffsetsPosition getPosition();
}

StateStore modifications

...

Code Block
public interface StateStore {
  ...

  /****
    * Execute a query. Returns a QueryResult containing either result data or
    * a failure.
    * <p>
    * If the store doesn't know how to handle the given query, the result
    * will be a {@link FailureReason#UNKNOWN_QUERY_TYPE}.
    * If the store couldn't satisfy the given position bound, the result
    * will Executebe a query. Returns a QueryResult containing either result data or{@link FailureReason#NOT_UP_TO_BOUND}.
    * @param query The query to execute
    * a failure.
    * <p> @param offsetBound The position the store must be at or past
    * @param IfcollectExecutionInfo Whether the store doesn't know how to handle the given query, the resultshould collect detailed execution info for the query
    * @param <R> The result type
    * will be a {@link FailureReason#UNKNOWN_QUERY_TYPE}./
  @Evolving
  default <R> QueryResult<R> query(Query<R> query,
    *   If the store couldn't satisfy the given position bound, the result
    * will be a {@link FailureReason#NOT_UP_TO_BOUND}.
    * @param query The query toPosition executepositionBound,
    * @param offsetBound The position the store must be at or past
    * @param <R> The result type
    */
  @Evolving
  default <R> QueryResult<R> query(Query<R> query, Positionboolean positionBoundcollectExecutionInfo) {
    // If a store doesn't implement a query handler, then all queries are unknown.
    return QueryResult.forUnknownQueryType(query, this);
  }
 
  ...
}

...

This is the interface that all queries must implement. Part of what I'm going for here is to place as few restrictions as possible on what a "query" or its "result" is, so the only thing in this interface is a generic type indicating the result type, which lets Streams return a fully typed response without predetermining the response type itself.

Code Block
@Evolving
public interface Query<R> { }

...

This query implements the functionality of the current KeyValueStore#get(key)  method:

Code Block
@Evolving
public class KeyQuery<K, V> implements Query<V> {
  // static factory to create a new KeyQuery, given a key
  public static <K, V> KeyQuery<K, V> withKey(final K key);

  // getter for the key
  public K getKey();
}

// example usage in IQ:

Integer key = 1;

// note that "mystore" is a KeyValueStore<Integer, ValueAndTimestamp<Integer>>,
// hence the result type        
InteractiveQueryRequest<ValueAndTimestamp<Integer>> query =
  inStore("mystore")
    .withQuery(KeyQuery.withKey(key));
     
// run the query
InteractiveQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(query);

// In this example, it doesn't matter which partition ran the query, so we just
// grab the result from the first partition that returned something
// (this is what IQ currently does under the covers).

Integer value = result.getFirstPartitionResultgetOnlyPartitionResult().getResult().value();

We propose to add KeyQuery as part of the implementation of this KIP, in order to have at least one query available to flesh out tests, etc., for the framework. Other queries are deferred to later KIPs.

...

Code Block
public class QueryResult<R> {
  // wraps a successful result
  public static <R> QueryResult<R> forResult(R result);

  // returns a failed query result because the store didn't know how to handle the query.
  public static <R> QueryResult<R> forUnknownQueryType(Query<R> query, StateStore store);

  // returns a failed query result because the partition wasn't caught up to the desired bound.
  public static <R> QueryResult<R> notUpToBound(OffsetsPosition currentPosition, OffsetsPosition bound);

  // returns a failed query result because caller requested a "latest" bound, but the task was
  // not active and running.
  public static <R> QueryResult<R> notActive(String currentState);

  // Used by state stores that need to delegate to another store to run a query and then
  // translate the results. Does not change the execution info or any other metadata.
  public <NewR> QueryResult<NewR> swapResult(NewR newTypedResult);

  // If requested by StoreQuery#isExecutionInfoEnabled, stores should record
  // helpful information, such as their own class, how they executed the query,
  // and the time they took.
  public void addExecutionInfo(String executionInfo);

...
}

...