Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
/**
  * The request object for Interactive Queries.
  * This is an immutable builder class for passing all required and
  * optional arguments for querying a state store in Kafka Streams.
  * <p>
  * @param <R> The type of the query result.
  */
@Evolving
public class InteractiveQueryRequest<R> {

  /**
    * First required argument to specify the name of the store to query
    */
  public static InStore inStore(final String name);

  public static class InStore {

    /**
      * Second required argument to provide the query to execute.
      */
    public <R> InteractiveQueryRequest<R> withQuery(final Query<R> query);
  } 

  /**
    * Optionally bound the current position of the state store
    * with respect to the input topics that feed it. In conjunction
    * with {@link InteractiveQueryResult#getPosition}, this can be
    * used to achieve a good balance between consistency and
    * availability in which repeated queries are guaranteed to
    * advance in time while allowing reads to be served from any
    * replica that is caught up to that caller's prior observations.
    * <p>
    * Note that the set of offsets provided in the bound does not determine
    * the partitions to query. For that, see {@link withPartitionsToQuery}.
    * Unrelated offsets will be ignored, and missing offsets will be treated
    * as indicating "no bound". 
    */
  public InteractiveQueryRequest<R> withPositionBound(Offsets positionBound);

  /**
    * Optionally bound the query to only execute on active running tasks.
    * This is the freshest and most consistent possible result, but does
    * not provide high availability. This is the default if no bound is
    * specified.
    */
  public InteractiveQueryRequest<R> withLatestPositionBound(); 

  /**
    * Optionally specify that the query will run on any store,
    * whether active, standby, or restoring. This data may be
    * arbitrarily stale, but displays the best possible availability.
    */
  public InteractiveQueryRequest<R> withNoPositionBound();

  /**
    * Optionally specify the partitions to include in the query.
    * If omitted, the default is to query all locally available partitions
    */
  public InteractiveQueryRequest<R> withPartitionsToQuery(Set<Integer> partitions);
}

InteractiveQueryResult

...


  // Getters are also proposed to retrieve the request parameters

  public String getStoreName();     // not nullable
  public Query<R> getQuery();       // not nullable
  public boolean isLatestBounded();
  public boolean isUnbounded(); 
  public Offsets getBound();        // nullable
}

InteractiveQueryResult

This is the main response object for IQv2. It wraps the individual results, as well as providing a vehicle to deliver metadata relating to the result as a whole.

Code Block
/**
  * The response object for interactive queries.
  * It wraps the individual results, as well as providing a
  * vehicle to deliver metadata relating to the result as a whole.
  * <p>
  * @param <R> The type of the query result. 
  */
@Evolving 
public class InteractiveQueryResult<R> {
  /**
    * Constructor. Used by Kafka Streams, and may be useful for
    * tests as well.
    */
  public InteractiveQueryResult(Map<Integer /*partition*/, QueryResult<R>> partitionResults);

  /**
    * The query's result for each partition that executed the query.
    */
  public Map<Integer /*partition*/, QueryResult<R>> getPartitionResults();

  /**
    * The position of the state store at the moment it executed the
    * query. In conjunction
    * with {@link InteractiveQueryRequest#withPartitionBound}, this can be
    * used to achieve a good balance between consistency and
    * availability in which repeated queries are guaranteed to
    * advance in time while allowing reads to be served from any
    * replica that is caught up to that caller's prior observations.
    */ 
  public Offsets getPosition();
}

StateStore modifications

This is the essence of the proposal. Rather than modifying each store interface to allow new kinds of queries, we introduce a generic capability of stores to execute query objects. This allows stores to choose whether they accept or reject queries of a given type, the introduction of new queries, etc.

Code Block
public interface StateStore {
  ...

  /**
    * Execute a query. Returns a QueryResult containing either result data or
    * a failure.
    * <p>
    * If the store doesn't know how to handle the given query, the result
    * will be a {@link FailureReason#UNKNOWN_QUERY_TYPE}.
    * If the store couldn't satisfy the given position bound, the result
    * will be a {@link FailureReason#NOT_UP_TO_BOUND}.
    * @param query The query to execute
    * @param offsetBound The position the store must be at or past
    * @param <R> The result type
    */
  @Evolving
  default <R> QueryResult<R> query(StoreQuery<R> query) {
    // If a store doesn't implement a query handler, then all queries are unknown.
    return QueryResult.forUnknownQueryType(query, this);
  }
 
  ...
}

StoreQuery

This is a simple container for executing a query on a specific partition of a state store. As we are now getting into the minor details, I'll omit JavaDocs that seem obvious.

Code Block
public class StoreQuery<R> {
  public static <R> StoreQuery<R> unbounded(Query<R> query);
  public static <R> StoreQuery<R> latestBounded(Query<R> query); 
  public static <R> StoreQuery<R> bounded(Query<R> query, Offsets bound);  

  // allow collection of detailed execution information
  public StoreQuery<R> enableExecutionInfo();

  public Query<R> getQuery();       // not nullable
  public boolean  isLatestBounded();
  public boolean  isUnbounded(); 
  public Offsets  getBound();        // nullable 
  public boolean  isExecutionInfoEnabled();
}

QueryResult

This is a container for a single partition's query result.

Code Block
public class QueryResult<R> {
  // returns a failed query result because the store didn't know how to handle the query.
  public static <R> QueryResult<R> forUnknownQueryType(Query<R> query, StateStore store);

  // returns a failed query result because the partition wasn't caught up to the desired bound.
  public static <R> QueryResult<R> notUpToBound(Offsets currentPosition, Offsets bound);

  // returns a failed query result because caller requested a "latest" bound, but the task was
  // not active and running.
  public static <R> QueryResult<R> notActive(String currentState);

  // Used by state stores that need to delegate to another store to run a query and then
  // translate the results. Does not change the execution info or any other metadata.
  public <NewR> QueryResult<NewR> swapResult(NewR newTypedResult);

  // If requested by StoreQuery#isExecutionInfoEnabled, stores should record
  // helpful information, such as their own class, how they executed the query,
  // and the time they took.
  public void addExecutionInfo(String executionInfo);

...
}


Compatibility, Deprecation, and Migration Plan

...