Current state: Drafting
Discussion thread: TBD
JIRA: TBD
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Kafka Streams supports an interesting and innovative API for "peeking" into the internal state of running stateful stream processors from outside of the application, called Interactive Query (IQ). This functionality has proven invaluable to users over the years for everything from debugging running applications to serving low latency queries straight from the Streams runtime.
However, the actual interfaces for IQ were designed in the very early days of Kafka Streams, before the project had gained significant adoption, and in the absence of much precedent for this kind of API in peer projects. With the benefit of hindsight, we can observe several problems with the original design that we hope to address in a revised framework that will serve Streams users well for many years to come.
KafkaStreams.store()
.KeyValueStore<Bytes,byte[]>
. However, when it comes to exposing it in IQ, you will have to implement QueryableStoreType<YourStore>
, and it requires you to produce an instance of YourStore that delegates to a list of ReadOnlyKeyValueStore<Bytes,byte[]>
returned by storeProvider.stores
. However, the catch is that those delegate stores are not instances of your store type! They will be instances of the internal class MeteredTimestampedKeyValueStore
, which wraps other internal store types, which after a few more layers of wrapping contain YourStore at the bottom. Your only choice will be to create a completely separate implementation of the YourStore interface that delegates all the ReadOnlyKeyValueStore methods to those wrapper layers, and then for the methods that are special to YourStore, you'll have to repeatedly cast and type-check the wrappers until you manage to get all the way down to the actual YourStore that can serve the query.In a nutshell, all those issues result in a system that isn't ideal from anyone's perspective:
In conclusion, and to clarify: IQ itself has been extremely successful and valuable to many people. I only harp on the above points to demonstrate a number of flaws that I think we can improve on to make it even more valuable and successful in the future.
To address the above pain points while preserving the positive aspects of IQ, I'd propose the following goals for this KIP:
This KIP proposes a new framework for IQ, which we will refer to as "IQv2". It is outside the scope of this KIP to propose new mechanisms for every query that IQ supports today, as part of the purpose of this design is to be easily extensible (and we want to keep the KIP focused on the framework). However, we do propose to add a few specific queries, to flesh out the proposal and to give some purpose to the initial batch of implementation work.
The basic design of IQv2 is to add a mechanism for KafkaStreams (and TopologyTestDriver, which we'll omit for brevity in the discussion) to execute a "query" on the caller's behalf (as opposed to constructing a store for the caller to query).
The query itself will be (almost) completely opaque to KafkaStreams, and will effectively be a protocol between the IQv2 caller and the underlying state store.
IQv2 will include "container" request and response objects, enabling refinements and controls to be added onto queries and also enabling additional metadata to be accompany results.
The response object will not attempt to compose individual partitions' responses. Instead, the response object will provide an API to get the responses for each partition. Additionally, we will provide some utilities to compose partition responses.
All of the new methods and classes will be marked as @Evolving
("Compatibility may be broken at minor release") to allow for more rapid interaction as we flesh out the first generation of queries. For such changes, we will simply notify the KIP discussion thread and update this document. Our goal is to stabilize the interface as quickly as possible, ideally in the next major release.
This KIP will add several new methods, classes, and interfaces.
IQv2 will continue to present its primary API via the KafkaStreams interface. The query
method itself is this API.
As the JavaDoc indicates, we also provide a mechanism to get a handle on the serdes, in case some queries want to handle binary data directly.
public class KafkaStreams implements AutoCloseable { ... /** * Run an interactive query against a state store. * <p> * This method allows callers outside of the Streams runtime to * access the internal state of stateful processors. See * https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html * for more information. * / @Evolving public <R> InteractiveQueryResult<R> query(InteractiveQueryRequest<R> request); /** * Get a reference to the serdes used internally in a state store * for use with interactive queries. While many queries already * accept Java-typed keys and also return typed results, some * queries may need to handle the raw binary data stored in StateStores, * in which case, this method can provide the serdes needed to interpret * that data. * / @Evolving public <K, V> InteractiveQuerySerdes<K, V> serdesForStore(String storeName); ... } |
This is the main request object for IQv2. It contains all of the information required to execute the query. Note that, although this class is similar to the IQv1 request object StoreQueryParameters
, we are proposing a new class to avoid unnecessary coupling between the old and new APIs' design goals.
This class implements a progressive builder pattern in an attempt to avoid the pitfalls we identified in Kafka Streams DSL Grammar. The progressive builder pattern allows us to supply arguments via semantically meaningful static and instance methods AND check required arguments at compile time.
The way it works is that the constructor is private and the first required argument is a static method. That method returns an intermediate interface that contains the next builder method, which either returns a new intermediate interface or the final class, depending on whether the following arguments are required or not. All of the optional arguments can be instance methods in the final interface.
Note: the "position bound" part of the proposal is an evolution on IQv1 and potentially controversial. The idea is to remove the need for the semantically loose StoreQueryParameters#enableStaleStores
method. Instead of choosing between querying only the active store and querying unboundedly stale restoring and standby stores, callers can choose to query only the active store (latest), query any store (no bound), or to query a store that is at least past the point of our last query/queries. If this idea is controversial, we can separate it from this KIP and propose it separately.
/** * The request object for Interactive Queries. * This is an immutable builder class for passing all required and * optional arguments for querying a state store in Kafka Streams. * <p> * @param <R> The type of the query result. */ public class InteractiveQueryRequest<R> { /** * First required argument to specify the name of the store to query */ public static InStore inStore(final String name); public static class InStore { /** * Second required argument to provide the query to execute. */ public <R> InteractiveQueryRequest<R> withQuery(final Query<R> query); } /** * Optionally bound the current position of the state store * with respect to the input topics that feed it. In conjunction * with {@link InteractiveQueryResult#getPosition}, this can be * used to achieve a good balance between consistency and * availability in which repeated queries are guaranteed to * advance in time while allowing reads to be served from any * replica that is caught up to that caller's prior observations. * <p> * Note that the set of offsets provided in the bound does not determine * the partitions to query. For that, see {@link withPartitionsToQuery}. * Unrelated offsets will be ignored, and missing offsets will be treated * as indicating "no bound". */ public InteractiveQueryRequest<R> withPositionBound(Offsets positionBound); /** * Optionally bound the query to only execute on active running tasks. * This is the freshest and most consistent possible result, but does * not provide high availability. This is the default if no bound is * specified. */ public InteractiveQueryRequest<R> withLatestPositionBound(); /** * Optionally specify that the query will run on any store, * whether active, standby, or restoring. This data may be * arbitrarily stale, but displays the best possible availability. */ public InteractiveQueryRequest<R> withNoPositionBound(); /** * Optionally specify the partitions to include in the query. * If omitted, the default is to query all locally available partitions */ public InteractiveQueryRequest<R> withPartitionsToQuery(Set<Integer> partitions); } |
This is the main response object for IQv2, ...
...
...