Status
Current state: Drafting
Discussion thread: TBD
JIRA: TBD
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Kafka Streams supports an interesting and innovative API for "peeking" into the internal state of running stateful stream processors from outside of the application, called Interactive Query (IQ). This functionality has proven invaluable to users over the years for everything from debugging running applications to serving low latency queries straight from the Streams runtime.
However, the actual interfaces for IQ were designed in the very early days of Kafka Streams, before the project had gained significant adoption, and in the absence of much precedent for this kind of API in peer projects. With the benefit of hindsight, we can observe several problems with the original design that we hope to address in a revised framework that will serve Streams users well for many years to come.
Problems
- IQ serves queries by presenting callers with a composite store interface, which encapsulates the fact that stores will in general be partitioned, and that a given instance may only host a subset of those partitions (if any).
- The cost of constructing this store interface is non-trivial, though many real-world use cases will only use the store to run one query and then discard it.
- The creation of the store is subject to any number of error conditions, so callers need to handle exceptions on calling
KafkaStreams.store()
. - Once you have an IQ store reference, it is still subject to any number of transient and permanent conditions (such as rebalances having moved partitions off the local instance, Streams changing state, etc.), so callers also need to handle exceptions while running queries on their store, and be prepared to rebuild the store if it becomes invalid.
- Users desiring to query custom state stores need to produce a fairly detailed implementation of QueryableStoreType that details how to compose results from multiple partitions into one.
- In particular, if you want to plug a store with special query capabilities in to the Streams DSL (for example as the materialization of a KTable), the store must extend the appropriate store interface, for example
KeyValueStore<Bytes,byte[]>
. However, when it comes to exposing it in IQ, you will have to implementQueryableStoreType<YourStore>
, and it requires you to produce an instance of YourStore that delegates to a list ofReadOnlyKeyValueStore<Bytes,byte[]>
returned bystoreProvider.stores
. However, the catch is that those delegate stores are not instances of your store type! They will be instances of the internal classMeteredTimestampedKeyValueStore
, which wraps other internal store types, which after a few more layers of wrapping contain YourStore at the bottom. Your only choice will be to create a completely separate implementation of the YourStore interface that delegates all the ReadOnlyKeyValueStore methods to those wrapper layers, and then for the methods that are special to YourStore, you'll have to repeatedly cast and type-check the wrappers until you manage to get all the way down to the actual YourStore that can serve the query. - In other words, it's so complicated that it might as well be impossible. Which makes it not surprising that no one has actually tried. I suspect that if we made it easier to extend the storage framework, we would see a bunch of new use cases pop up building on IQ in the future.
- In particular, if you want to plug a store with special query capabilities in to the Streams DSL (for example as the materialization of a KTable), the store must extend the appropriate store interface, for example
- IQ composes all locally present partitions into a unified response. For example, for queries that return an iterator, it builds a composite iterator that collates all locally available partitions' iterators into one.
- While this is useful for trivial use cases, it destroys useful information about the response:
- Callers don't know which partitions were included in the response.
- After iterating for some time, callers can't tell when individual partitions' iterations are complete. This is important if we experience a failure: partitions that are already complete don't need to repeat the query.
- In practice, partitions' responses can succeed or fail independently, but the composite response couples all responses to any individual partition's failure.
- While this is useful for trivial use cases, it destroys useful information about the response:
- Because IQ responses are simply the result type of whatever store method you invoke, it is not possible to attach extra metadata that is useful in the IQ context, specifically.
- Eg. We might want to add detailed trace information about which store layers or segments participated in the query, including execution time, cache hit/miss, etc. This kind of feature would be particularly useful when debugging performance, or when IQ is backing a service that uses distributed tracing, etc.
- Eg. We might want to add information about the precise state of the view we served: what was the input offset we last wrote into the store? What is the "current stream time" of the view we served? What was the state of the StreamTask when we served the query? Etc.
- These are just examples from various conversations about potentially useful IQ response metadata. The point is to illustrate the fact that we are missing opportunities by restricting the IQ response to be the simple value returned by the store methods that serve the query.
- Supporting new types of queries to the "standard" store interfaces is onerous, since it requires adding new methods to the store interfaces, which need to be overridden in dozens of utility implementations throughout the Streams codebase.
- Example: KIP-617: Allow Kafka Streams State Stores to be iterated backwards . This change involved four PRs (https://github.com/apache/kafka/pull/9137, https://github.com/apache/kafka/pull/9138, https://github.com/apache/kafka/pull/9139/files, https://github.com/apache/kafka/pull/9239), totaling 108 files and 6,000+ lines of code changed.
- Another example: KIP-614: Add Prefix Scan support for State Stores (which only edits the KeyValueStore). This change took two PRs (https://github.com/apache/kafka/pull/9508 and https://github.com/apache/kafka/pull/10052), totaling 19 files and 600+ lines of code changed.
- IQ forces all queries to compose the contents of the Record Cache (the write buffer for Processors) with the underlying bytes stores.
- Despite its name, the Record Cache is a write buffer, not a traditional read cache. Perhaps not surprisingly, its performance is not very good for arbitrary queries, since its primary purpose is to ensure that Processors always read their own writes while delaying operations like sending writes to the changelog and the underlying stores.
- We could invest in optimizing the Record Cache for queries, but we would probably find that the better approach is to separate the read and write workloads.
- Regardless of potential future optimizations in the Record Cache, merging the buffered writes with the underlying stores (as in the case of a table scan) will always require extra work, and it would be good to have an option to skip the record cache for IQ users.
- In contrast to Processors, IQ interactions can never write, so do not need any concept of "read your writes".
Unintended Consequences
In a nutshell, all those issues result in a system that isn't ideal from anyone's perspective:
- People using IQ:
- Have to call two methods to do one query (KafkaStreams#store and then the actual query method) and have to deal with exceptions from both of those methods
- Lose important information about which partitions were included in the response, and when individual partitions complete during the process of consuming results
- Get worse performance than necessary due to the overhead of building the intermediate store abstraction
- People adding new stores:
- Have to implement prohibitively complex logic to expose their store's capabilities through IQ (see Problem 1d).
- People contributing to existing store interfaces:
- Have to jump through a bunch of hoops to add a new method to the store interfaces.
- Have no way to know they did everything right unless they test every combination of store configurations with both the PAPI and IQ
- People maintaining Streams:
- Have a significant burden reviewing KIPs and PRs because there are so many complexities involved in properly changing store interfaces
- Have to deal with a long-tail of bug reports that trickle in when some contribution inevitably overlooks some "minor" point like verifying a new method works via IQ or is properly handled in the cache, etc.
In conclusion, and to clarify: IQ itself has been extremely successful and valuable to many people. I only harp on the above points to demonstrate a number of flaws that I think we can improve on to make it even more valuable and successful in the future.
Goals
To address the above pain points while preserving the positive aspects of IQ, I'd propose the following goals for this KIP:
- We should continue to offer a mechanism to peek into the internal state of Kafka Streams's stateful operations.
- We should recognize that querying state via IQ is a different use case from using a store in a Processor, and a different interface should therefore be on the table.
- Simple use cases should continue to be easy.
- Particularly, it should continue to be easy to just "query the store" and not worry about individual partitions, etc.
- More complex use cases should be possible and not too hard.
- Particularly, it should be possible to pick and choose partitions to query, and to handle independent partitions' responses independently.
- It should also be possible to define new store types and queries with a manageable level of complexity.
- It should be possible to tune queries for maximum performance.
- Contributing to and maintaining the code base should be relatively straightforward.
Proposed Changes
This KIP proposes a new framework for IQ, which we will refer to as "IQv2". It is outside the scope of this KIP to propose new mechanisms for every query that IQ supports today, as part of the purpose of this design is to be easily extensible (and we want to keep the KIP focused on the framework). However, we do propose to add a few specific queries, to flesh out the proposal and to give some purpose to the initial batch of implementation work.
The basic design of IQv2 is to add a mechanism for KafkaStreams (and TopologyTestDriver, which we'll omit for brevity in the discussion) to execute a "query" on the caller's behalf (as opposed to constructing a store for the caller to query).
- This addresses Problem 1 (and Unintended Consequence 1a) because each time a user wants to query the store, they just call one method and have no store lifecycle to maintain.
The query itself will be (almost) completely opaque to KafkaStreams, and will effectively be a protocol between the IQv2 caller and the underlying state store.
- This is the key to addressing Problem 4, and it resolves Unintended Consequence 2 (because new stores don't need to do anything except handle queries to be integrated with IQ) and Unintended Consequence 3 (because the scope of a new capability is only limited to adding a new Query type and adding handlers for it in the desired store). It also resolves Unintended Consequence 4 for the same reason as 3, since the scope of adding a new query is so much smaller.
- This design also addresses Problem 5 because the Caching state store layers will have the opportunity to handle known queries before passing them down to lower store layers. So, if desired, we can define a well-known KeyQuery that has a flag controlling whether the cache should handle it or not, while a custom query type would naturally bypass the cache, since the cache doesn't have knowledge of the query type.
IQv2 will include "container" request and response objects, enabling refinements and controls to be added onto queries and also enabling additional metadata to be accompany results.
- This addresses Problem 3 because we can attach all the extra information we need "around" the core query and result.
- It also creates a mechanism for future extensions to IQ
The response object will not attempt to compose individual partitions' responses. Instead, the response object will provide an API to get the responses for each partition. Additionally, we will provide some utilities to compose partition responses.
- This addresses Problem 2 because responses aren't required to be composable
- It also creates room for partitions to report successful or failure responses independently, which addresses Unintended Consequence 1b
- It also addresses Unintended Consequence 1c because we don't do extra work to combine partions' results unless we need to.
Special interface stability protocol
All of the new methods and classes will be marked as @Evolving
("Compatibility may be broken at minor release") to allow for more rapid interaction as we flesh out the first generation of queries. For such changes, we will simply notify the KIP discussion thread and update this document. Our goal is to stabilize the interface as quickly as possible, ideally in the next major release.
Public Interfaces
This KIP will add several new methods, classes, and interfaces.
KafkaStreams modifications
IQv2 will continue to present its primary API via the KafkaStreams interface. The query
method itself is this API.
As the JavaDoc indicates, we also provide a mechanism to get a handle on the serdes, in case some queries want to handle binary data directly.
public class KafkaStreams implements AutoCloseable { ... /** * Run an interactive query against a state store. * <p> * This method allows callers outside of the Streams runtime to * access the internal state of stateful processors. See * https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html * for more information. * / @Evolving public <R> InteractiveQueryResult<R> query(InteractiveQueryRequest<R> request); /** * Get a reference to the serdes used internally in a state store * for use with interactive queries. While many queries already * accept Java-typed keys and also return typed results, some * queries may need to handle the raw binary data stored in StateStores, * in which case, this method can provide the serdes needed to interpret * that data. * / @Evolving public <K, V> InteractiveQuerySerdes<K, V> serdesForStore(String storeName); ... }
InteractiveQueryRequest
This is the main request object for IQv2. It contains all of the information required to execute the query. Note that, although this class is similar to the IQv1 request object StoreQueryParameters
, we are proposing a new class to avoid unnecessary coupling between the old and new APIs' design goals.
This class implements a progressive builder pattern in an attempt to avoid the pitfalls we identified in Kafka Streams DSL Grammar. The progressive builder pattern allows us to supply arguments via semantically meaningful static and instance methods AND check required arguments at compile time.
The way it works is that the constructor is private and the first required argument is a static method. That method returns an intermediate interface that contains the next builder method, which either returns a new intermediate interface or the final class, depending on whether the following arguments are required or not. All of the optional arguments can be instance methods in the final interface.
Note: the "position bound" part of the proposal is an evolution on IQv1 and potentially controversial. The idea is to remove the need for the semantically loose StoreQueryParameters#enableStaleStores
method. Instead of choosing between querying only the active store and querying unboundedly stale restoring and standby stores, callers can choose to query only the active store (latest), query any store (no bound), or to query a store that is at least past the point of our last query/queries. If this idea is controversial, we can separate it from this KIP and propose it separately.
/** * The request object for Interactive Queries. * This is an immutable builder class for passing all required and * optional arguments for querying a state store in Kafka Streams. * <p> * @param <R> The type of the query result. */ @Evolving public class InteractiveQueryRequest<R> { /** * First required argument to specify the name of the store to query */ public static InStore inStore(final String name); public static class InStore { /** * Second required argument to provide the query to execute. */ public <R> InteractiveQueryRequest<R> withQuery(final Query<R> query); } /** * Optionally bound the current position of the state store * with respect to the input topics that feed it. In conjunction * with {@link InteractiveQueryResult#getPosition}, this can be * used to achieve a good balance between consistency and * availability in which repeated queries are guaranteed to * advance in time while allowing reads to be served from any * replica that is caught up to that caller's prior observations. * <p> * Note that the set of offsets provided in the bound does not determine * the partitions to query. For that, see {@link withPartitionsToQuery}. * Unrelated offsets will be ignored, and missing offsets will be treated * as indicating "no bound". */ public InteractiveQueryRequest<R> withPositionBound(Offsets positionBound); /** * Optionally bound the query to only execute on active running tasks. * This is the freshest and most consistent possible result, but does * not provide high availability. This is the default if no bound is * specified. */ public InteractiveQueryRequest<R> withLatestPositionBound(); /** * Optionally specify that the query will run on any store, * whether active, standby, or restoring. This data may be * arbitrarily stale, but displays the best possible availability. */ public InteractiveQueryRequest<R> withNoPositionBound(); /** * Optionally specify the partitions to include in the query. * If omitted, the default is to query all locally available partitions */ public InteractiveQueryRequest<R> withPartitionsToQuery(Set<Integer> partitions); // Getters are also proposed to retrieve the request parameters public String getStoreName(); // not nullable public Query<R> getQuery(); // not nullable public boolean isLatestBounded(); public boolean isUnbounded(); public Offsets getBound(); // nullable }
InteractiveQueryResult
This is the main response object for IQv2. It wraps the individual results, as well as providing a vehicle to deliver metadata relating to the result as a whole.
/** * The response object for interactive queries. * It wraps the individual results, as well as providing a * vehicle to deliver metadata relating to the result as a whole. * <p> * @param <R> The type of the query result. */ @Evolving public class InteractiveQueryResult<R> { /** * Constructor. Used by Kafka Streams, and may be useful for * tests as well. */ public InteractiveQueryResult(Map<Integer /*partition*/, QueryResult<R>> partitionResults); /** * The query's result for each partition that executed the query. */ public Map<Integer /*partition*/, QueryResult<R>> getPartitionResults(); /** * The position of the state store at the moment it executed the * query. In conjunction * with {@link InteractiveQueryRequest#withPartitionBound}, this can be * used to achieve a good balance between consistency and * availability in which repeated queries are guaranteed to * advance in time while allowing reads to be served from any * replica that is caught up to that caller's prior observations. */ public Offsets getPosition(); }
StateStore modifications
This is the essence of the proposal. Rather than modifying each store interface to allow new kinds of queries, we introduce a generic capability of stores to execute query objects. This allows stores to choose whether they accept or reject queries of a given type, the introduction of new queries, etc.
public interface StateStore { ... /** * Execute a query. Returns a QueryResult containing either result data or * a failure. * <p> * If the store doesn't know how to handle the given query, the result * will be a {@link FailureReason#UNKNOWN_QUERY_TYPE}. * If the store couldn't satisfy the given position bound, the result * will be a {@link FailureReason#NOT_UP_TO_BOUND}. * @param query The query to execute * @param offsetBound The position the store must be at or past * @param <R> The result type */ @Evolving default <R> QueryResult<R> query(StoreQuery<R> query) { // If a store doesn't implement a query handler, then all queries are unknown. return QueryResult.forUnknownQueryType(query, this); } ... }
StoreQuery
This is a simple container for executing a query on a specific partition of a state store. As we are now getting into the minor details, I'll omit JavaDocs that seem obvious.
public class StoreQuery<R> { public static <R> StoreQuery<R> unbounded(Query<R> query); public static <R> StoreQuery<R> latestBounded(Query<R> query); public static <R> StoreQuery<R> bounded(Query<R> query, Offsets bound); // allow collection of detailed execution information public StoreQuery<R> enableExecutionInfo(); public Query<R> getQuery(); // not nullable public boolean isLatestBounded(); public boolean isUnbounded(); public Offsets getBound(); // nullable public boolean isExecutionInfoEnabled(); }
QueryResult
This is a container for a single partition's query result.
public class QueryResult<R> { // returns a failed query result because the store didn't know how to handle the query. public static <R> QueryResult<R> forUnknownQueryType(Query<R> query, StateStore store); // returns a failed query result because the partition wasn't caught up to the desired bound. public static <R> QueryResult<R> notUpToBound(Offsets currentPosition, Offsets bound); // returns a failed query result because caller requested a "latest" bound, but the task was // not active and running. public static <R> QueryResult<R> notActive(String currentState); // Used by state stores that need to delegate to another store to run a query and then // translate the results. Does not change the execution info or any other metadata. public <NewR> QueryResult<NewR> swapResult(NewR newTypedResult); // If requested by StoreQuery#isExecutionInfoEnabled, stores should record // helpful information, such as their own class, how they executed the query, // and the time they took. public void addExecutionInfo(String executionInfo); ... }
Compatibility, Deprecation, and Migration Plan
...
Rejected Alternatives
...