...
Code Block |
---|
/** * The request object for Interactive Queries. * This is an immutable builder class for passing all required and * optional arguments for querying a state store in Kafka Streams. * <p> * @param <R> The type of the query result. */ @Evolving public class InteractiveQueryRequest<R> { /** * First required argument to specify the name of the store to query */ public static InStore inStore(final String name); public static class InStore { /** * Second required argument to provide the query to execute. */ public <R> InteractiveQueryRequest<R> withQuery(final Query<R> query); } /** * Optionally bound the current position of the state store * with respect to the input topics that feed it. In conjunction * with {@link InteractiveQueryResult#getPosition}, this can be * used to achieve a good balance between consistency and * availability in which repeated queries are guaranteed to * advance in time while allowing reads to be served from any * replica that is caught up to that caller's prior observations. * <p> * Note that the set of offsets provided in the bound does not determine * the partitions to query. For that, see {@link withPartitionsToQuery}. * Unrelated offsets will be ignored, and missing offsets will be treated * as indicating "no bound". */ public InteractiveQueryRequest<R> withPositionBound(Offsets positionBound); /** * Optionally bound the query to only execute on active running tasks. * This is the freshest and most consistent possible result, but does * not provide high availability. This is the default if no bound is * specified. */ public InteractiveQueryRequest<R> withLatestPositionBound(); /** * Optionally specify that the query will run on any store, * whether active, standby, or restoring. This data may be * arbitrarily stale, but displays the best possible availability. */ public InteractiveQueryRequest<R> withNoPositionBound(); /** * Optionally specify the partitions to include in the query. * If omitted, the default is to query all locally available partitions */ public InteractiveQueryRequest<R> withPartitionsToQuery(Set<Integer> partitions); } |
InteractiveQueryResult
...
// Getters are also proposed to retrieve the request parameters
public String getStoreName(); // not nullable
public Query<R> getQuery(); // not nullable
public boolean isLatestBounded();
public boolean isUnbounded();
public Offsets getBound(); // nullable
} |
InteractiveQueryResult
This is the main response object for IQv2. It wraps the individual results, as well as providing a vehicle to deliver metadata relating to the result as a whole.
Code Block |
---|
/**
* The response object for interactive queries.
* It wraps the individual results, as well as providing a
* vehicle to deliver metadata relating to the result as a whole.
* <p>
* @param <R> The type of the query result.
*/
@Evolving
public class InteractiveQueryResult<R> {
/**
* Constructor. Used by Kafka Streams, and may be useful for
* tests as well.
*/
public InteractiveQueryResult(Map<Integer /*partition*/, QueryResult<R>> partitionResults);
/**
* The query's result for each partition that executed the query.
*/
public Map<Integer /*partition*/, QueryResult<R>> getPartitionResults();
/**
* The position of the state store at the moment it executed the
* query. In conjunction
* with {@link InteractiveQueryRequest#withPartitionBound}, this can be
* used to achieve a good balance between consistency and
* availability in which repeated queries are guaranteed to
* advance in time while allowing reads to be served from any
* replica that is caught up to that caller's prior observations.
*/
public Offsets getPosition();
} |
StateStore modifications
This is the essence of the proposal. Rather than modifying each store interface to allow new kinds of queries, we introduce a generic capability of stores to execute query objects. This allows stores to choose whether they accept or reject queries of a given type, the introduction of new queries, etc.
Code Block |
---|
public interface StateStore {
...
/**
* Execute a query. Returns a QueryResult containing either result data or
* a failure.
* <p>
* If the store doesn't know how to handle the given query, the result
* will be a {@link FailureReason#UNKNOWN_QUERY_TYPE}.
* If the store couldn't satisfy the given position bound, the result
* will be a {@link FailureReason#NOT_UP_TO_BOUND}.
* @param query The query to execute
* @param offsetBound The position the store must be at or past
* @param <R> The result type
*/
@Evolving
default <R> QueryResult<R> query(StoreQuery<R> query) {
// If a store doesn't implement a query handler, then all queries are unknown.
return QueryResult.forUnknownQueryType(query, this);
}
...
} |
StoreQuery
This is a simple container for executing a query on a specific partition of a state store. As we are now getting into the minor details, I'll omit JavaDocs that seem obvious.
Code Block |
---|
public class StoreQuery<R> {
public static <R> StoreQuery<R> unbounded(Query<R> query);
public static <R> StoreQuery<R> latestBounded(Query<R> query);
public static <R> StoreQuery<R> bounded(Query<R> query, Offsets bound);
// allow collection of detailed execution information
public StoreQuery<R> enableExecutionInfo();
public Query<R> getQuery(); // not nullable
public boolean isLatestBounded();
public boolean isUnbounded();
public Offsets getBound(); // nullable
public boolean isExecutionInfoEnabled();
} |
QueryResult
This is a container for a single partition's query result.
Code Block |
---|
public class QueryResult<R> {
// returns a failed query result because the store didn't know how to handle the query.
public static <R> QueryResult<R> forUnknownQueryType(Query<R> query, StateStore store);
// returns a failed query result because the partition wasn't caught up to the desired bound.
public static <R> QueryResult<R> notUpToBound(Offsets currentPosition, Offsets bound);
// returns a failed query result because caller requested a "latest" bound, but the task was
// not active and running.
public static <R> QueryResult<R> notActive(String currentState);
// Used by state stores that need to delegate to another store to run a query and then
// translate the results. Does not change the execution info or any other metadata.
public <NewR> QueryResult<NewR> swapResult(NewR newTypedResult);
// If requested by StoreQuery#isExecutionInfoEnabled, stores should record
// helpful information, such as their own class, how they executed the query,
// and the time they took.
public void addExecutionInfo(String executionInfo);
...
} |
Compatibility, Deprecation, and Migration Plan
...