Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

KIP Tracking

To control the scope of this long-term project, we chose to defer much of the details to later KIPs. Here's a roundup of the related work:


Kafka Streams supports an interesting and innovative API for "peeking" into the internal state of running stateful stream processors from outside of the application, called Interactive Query (IQ). This functionality has proven invaluable to users over the years for everything from debugging running applications to serving low latency queries straight from the Streams runtime.


Code Block
public interface StateStore {

    * Execute a query. Returns a QueryResult containing either result data or
    * a failure.
    * <p>
    * If the store doesn't know how to handle the given query, the result
    * will be a {@link FailureReason#UNKNOWN_QUERY_TYPE}.
    * If the store couldn't satisfy the given position bound, the result
    * will be a {@link FailureReason#NOT_UP_TO_BOUND}.
    * @param query The query to execute
    * @param offsetBound The position the store must be at or past
    * @param collectExecutionInfo Whether the store should collect detailed execution info for the query
    * @param <R> The result type
  default <R> QueryResult<R> query(Query<R> query,
                                   PositionBound positionBound,
                                   boolean collectExecutionInfo) {
    // If a store doesn't implement a query handler, then all queries are unknown.
    return QueryResult.forUnknownQueryType(query, this);

     * Returns the  ...


This is the interface that all queries must implement. Part of what I'm going for here is to place as few restrictions as possible on what a "query" or its "result" is, so the only thing in this interface is a generic type indicating the result type, which lets Streams return a fully typed response without predetermining the response type itself.

Code Block
public interface Query<R> { }

Using this interface, store implementers can create any query they can think of and return any type they like, from a single value to an iterator, even a future. While writing the POC, I implemented three queries for KeyValue stores, which I'll include here as examples:

Proposed Query: KeyQuery

This query implements the functionality of the current KeyValueStore#get(key)  method:

position the state store is at with respect to the input topic/partitions
    default Position getPosition() {
        throw new UnsupportedOperationException(
            "getPosition is not implemented by this StateStore (" + getClass() + ")"



This is the interface that all queries must implement. Part of what I'm going for here is to place as few restrictions as possible on what a "query" or its "result" is, so the only thing in this interface is a generic type indicating the result type, which lets Streams return a fully typed response without predetermining the response type itself.

Code Block
public interface Query<R> { }

Using this interface, store implementers can create any query they can think of and return any type they like, from a single value to an iterator, even a future. While writing the POC, I implemented three queries for KeyValue stores, which I'll include here as examples:

Proposed Query: KeyQuery

This query implements the functionality of the current KeyValueStore#get(key)  method:

Code Block
public class KeyQuery<K, V> implements Query<V> {
  // static factory to create a new KeyQuery, given a key
  public static <K, V> KeyQuery<K, V> withKey(final K key);

  // getter for the key
  public K getKey();

// ==
Code Block
public class KeyQuery<K, V> implements Query<V> {
  // static factory to create a new KeyQuery, given a key
  public static <K, V> KeyQuery<K, V> withKey(final K key);

  // getter for the key
  public K getKey();

// ======================================
// example usage in IQv2:

Integer key = 1;

// note that "mystore" is a KeyValueStore<Integer, ValueAndTimestamp<Integer>>,
// hence the result type        
StateQueryRequest<ValueAndTimestamp<Integer>> query =
// run the query
StateQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(query);

// In this example, it doesn't matter which partition ran the query, so we just
// grab the result from the partition that returned something
// (this is what IQ currently does under the covers).

Integer value = result.getOnlyPartitionResult().getResult().value();

// ======================================
// Forexample comparison,usage here is how the query would look in current IQ:in IQv2:

Integer key = 1;

// (note, thisthat glosses"mystore" overis manya of the problems described above.KeyValueStore<Integer, ValueAndTimestamp<Integer>>,
// hence the exampleresult istype here to illustrate the different ergonomics
//  of
StateQueryRequest<ValueAndTimestamp<Integer>> the two APIs.)

// create the query parameters
StoreQueryParameters<ReadOnlyKeyValueStore<Integer, ValueAndTimestamp<Integer>>> storeQueryParameters =
  StoreQueryParameters.fromNameAndType(query =
// run the  QueryableStoreTypes.timestampedKeyValueStore()
StateQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(query);

// get a store handle to run In this example, it doesn't matter which partition ran the query
ReadOnlyKeyValueStore<Integer, ValueAndTimestamp<Integer>>so store =;

we just
// querygrab the store
Integer value1 = store.get(keyresult from the partition that returned something
// (this is what IQ currently does under the covers).

Integer value = result.getOnlyPartitionResult().getResult().value(); 

We propose to add KeyQuery as part of the implementation of this KIP, in order to have at least one query available to flesh out tests, etc., for the framework. Other queries are deferred to later KIPs.

Example Query: RawKeyQuery

We could later propose to add a "raw" version of the KeyQuery, which simply takes the key as a byte array and returns the value as a byte array. This could allow callers to bypass the serialization logic in the Metered stores entirely, potentially saving valuable CPU time and memory.

Code Block
public class RawKeyQuery implements Query<byte[]> {
  public static RawKeyQuery withKey(final Bytes key);
  public static RawKeyQuery withKey(final byte[] key);
  public Bytes getKey();

Example query: RawScanQuery

This example demonstrates two variations on the first example:

  1. The ability to define queries handling "raw" binary data for keys and values
  2. The ability for a query to return multiple results (in this case, an iterator)

I'm only bundling those for brevity. We can also have typed, iterable queries and raw single-record queries.

Note this query is purely an example of what is possible. It will not be added as part of this KIP.

Code Block
public class RawScanQuery implements Query<KeyValueIterator<Bytes, byte[]>> {
    private RawScanQuery() { }

    public static RawScanQuery scan() {
        return new RawScanQuery();

// example usage

// since we're handling raw data, we're going to need the serdes
// this is just an example, this method is also not proposed in this KIP.
StateQuerySerdes<Integer, ValueAndTimestamp<Integer>> serdes =

// run the "scan" query
StateQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =

// This time, we'll get results from all locally available partitions.
Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =

// for this example, we'll just collate all the partitions' iterators
// together and print their data

List<KeyValueIterator<Bytes, byte[]>> iterators =

// Using an example convenience method we could add to collate iterators' data
try (CloseableIterator<KeyValue<Bytes, byte[]>> collated = Iterators.collate(collect)) {
  while(collate.hasNext()) {
    KeyValue<Bytes, byte[]> next =;
      "|||" +
        " " + serdes.keyFrom(next.key.get()) +
        " " + serdes.valueFrom(next.value)


This is a container for a single partition's query result.

// ======================================
// For comparison, here is how the query would look in current IQ:
// (note, this glosses over many of the problems described above.
//  the example is here to illustrate the different ergonomics
//  of the two APIs.)

// create the query parameters
StoreQueryParameters<ReadOnlyKeyValueStore<Integer, ValueAndTimestamp<Integer>>> storeQueryParameters =

// get a store handle to run the query
ReadOnlyKeyValueStore<Integer, ValueAndTimestamp<Integer>> store =;

// query the store
Integer value1 = store.get(key).value(); 

We propose to add KeyQuery as part of the implementation of this KIP, in order to have at least one query available to flesh out tests, etc., for the framework. Other queries are deferred to later KIPs.

Example Query: RawKeyQuery

We could later propose to add a "raw" version of the KeyQuery, which simply takes the key as a byte array and returns the value as a byte array. This could allow callers to bypass the serialization logic in the Metered stores entirely, potentially saving valuable CPU time and memory.

Code Block
public class RawKeyQuery implements Query<byte[]> {
  public static RawKeyQuery withKey(final Bytes key);
  public static RawKeyQuery withKey(final byte[] key);
  public Bytes getKey();

Example query: RawScanQuery

This example demonstrates two variations on the first example:

  1. The ability to define queries handling "raw" binary data for keys and values
  2. The ability for a query to return multiple results (in this case, an iterator)

I'm only bundling those for brevity. We can also have typed, iterable queries and raw single-record queries.

Note this query is purely an example of what is possible. It will not be added as part of this KIP.

Code Block
public class RawScanQuery implements Query<KeyValueIterator<Bytes, byte[]>> {
    private RawScanQuery() { }

    public static RawScanQuery scan() {
        return new RawScanQuery();

// example usage

// since we're handling raw data, we're going to need the serdes
// this is just an example, this method is also not proposed in this KIP.
StateQuerySerdes<Integer, ValueAndTimestamp<Integer>> serdes =

// run the "scan" query
StateQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =

// This time, we'll get results from all locally available partitions.
Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =

// for this example, we'll just collate all the partitions' iterators
// together and print their data

List<KeyValueIterator<Bytes, byte[]>> iterators =

// Using an example convenience method we could add to collate iterators' data
try (CloseableIterator<KeyValue<Bytes, byte[]>> collated = Iterators.collate(collect)) {
  while(collate.hasNext()) {
    KeyValue<Bytes, byte[]> next =;
      "|||" +
        " " + serdes.keyFrom(next.key.get()) +
        " " + serdes.valueFrom(next.value)


This is a container for a single partition's query result.

Code Block
public class QueryResult<R> {
  // wraps a successful result
Code Block
public class QueryResult<R> {
  // wraps a successful result
  public static <R> QueryResult<R> forResult(R result);

  // wraps a failure result
  public static <R> QueryResult<R> forFailure(FailureReason failureReason, String failureMessage); 

  // returns a failed query result because the store didn't know how to handle the query.
  public static <R> QueryResult<R> forUnknownQueryType(Query<R> query, StateStore store);

  // returns a failed query result because the partition wasn't caught up to the desired bound.
  public static <R> QueryResult<R> notUpToBoundforResult(Position currentPosition, Position boundR result);

  // Usedwraps bya statefailure storesresult
 that public needstatic to<R> delegateQueryResult<R> to another store to run a query and then
  // translate the results. Does not change the execution info or any other metadata.
  public <NewR> QueryResult<NewR> swapResult(NewR newTypedResultforFailure(FailureReason failureReason, String failureMessage); 

  // returns a failed query result because the store didn't know how to handle the query.
  public static <R> QueryResult<R> forUnknownQueryType(Query<R> query, StateStore store);

  // If requested, stores should record
  // helpful information, such as their own class, how they executed the query,returns a failed query result because the partition wasn't caught up to the desired bound.
  public static <R> QueryResult<R> notUpToBound(Position currentPosition, Position bound);

  // Used andby thestate timestores they took.
  public void addExecutionInfo(String executionInfo);
that need to delegate to another store to run a query and then
  // recordstranslate the results. Does pointnot inchange the store'sexecution historyinfo thator executedany theother querymetadata.
  public public<NewR> voidQueryResult<NewR> setPositionswapResult(PositionNewR positionnewTypedResult);

  public // boolean isSuccess();
  public boolean isFailure();
  public List<String> getExecutionInfo();
  public Position getPosition();
  public FailureReason getFailureReason();
  public String getFailureMessage();
  public getResult();


An enum classifying failures for individual partitions' failures

Code Block
public enum FailureReason {
     * Failure indicating that the store doesn't know how to handle the givenIf requested, stores should record
  // helpful information, such as their own class, how they executed the query,
  // and the time they took.
  public void addExecutionInfo(String executionInfo);

  // records the point in the store's history that executed the query.
  public void setPosition(Position position);

  public boolean isSuccess();
  public boolean isFailure();
  public List<String> getExecutionInfo();
  public Position getPosition();
  public FailureReason getFailureReason();
  public String getFailureMessage();
  public getResult();


An enum classifying failures for individual partitions' failures

Code Block
public enum FailureReason {
     * Failure indicating that the store doesn't know how to handle the given query. */

     * The query required to execute on an active task (via {@link StateQueryRequest#requireActive()}),
     * but while executing the query, the task was either a Standby task, or it was an Active task
 not in the RUNNING state. The failure message will contain the reason for the failure. UNKNOWN_QUERY_TYPE,

     /** <p>
     * The callerquery shouldrequired eitherto tryexecute againon lateran oractive trytask a(via different replica.{@link StateQueryRequest#requireActive()}),
 but while  NOT_ACTIVE,

    /**executing the query, the task was either a Standby task, or it was an Active task
     * Failurenot indicatingin that the storeRUNNING partitionstate. isThe notfailure (yet)message upwill tocontain the reason desiredfor the boundfailure.
     * <p>
     * The caller should either try again later or try a different replica.

     * Failure indicating that the requested store partition is not (yet) presentup onto the desired localbound.
     * KafkaStreamsThe instance.caller Itshould mayeither havetry been again later or try a different replica.

     * Failure indicating that the requested store partition is not present on the local
     * KafkaStreams instance. It may have been migrated to another instance during a rebalance.
     * The caller is recommended to try a different replica.

     * The requested store partition does not exist at all. For example, partition 4 was requested,
     * but the store in question only has 4 partitions (0 through 3).


A class representing a processing state position in terms of its inputs: a vector or (topic, partition, offset) components.

     * The store that handled the query got an exception during query execution. The message
     * will contain the exception details. Depending on the nature of the exception, the caller
     * may be able to retry this instance or may need to try a different instance.


A class representing a processing state position in terms of its inputs: a vector or (topic, partition, offset) components.

Code Block
public class Position {
Code Block
public class Position {

  // Create a new Position from a map of topic -> partition -> offset
  static Position fromMap(Map<String, Map<Integer, Long>> map);

  // Create a new, empty Position
  static Position emptyPosition();

  // Return a new position based on the current one, with the given component added
  Position withComponent(String topic, int partition, long offset);

  // MergeCreate alla new thePosition componentsfrom ofa thismap positionof withtopic all-> thepartition components-> ofoffset
 the other
static  // position and return the result inPosition fromMap(Map<String, Map<Integer, Long>> map);

  // Create a new, empty Position
  static Position mergeemptyPosition(Position other);

  // GetReturn a new position based on the setcurrent ofone, topicswith includedthe ingiven thiscomponent Positionadded
  Set<String>Position getTopics(withComponent(String topic, int partition, long offset);

  // Merge all the components of this position with all the components of the other
  // position and return the result in a new Position
  Position merge(Position other);

  // Get the set of topics included in this Position
  Set<String> getTopics();

  // Given a topic, get the partition -> offset pairs included in this Position
  Map<Integer, Long> getBound(String topic);


EDIT: The KIP initially proposed to have a "latest" bound, which would require the query to run on a RUNNING Active task, but that has been moved to StateQueryRequest#requireActive. This more cleanly separates responsibilities, since StateStores are responsible for enforcing the PositionBound and Kafka Streams is responsible for handling the other details of StateQueryRequest. The problem was that the store actually doesn't have visibility into the state or type of task that contains it, so it was unable to take responsibility for the "latest" bound. It's important in particular to keep the StateStore's responsibilities clear because that is an extension point for Kafka Streams users who implement their own stores.

Code Block
public class PositionBound {

  // Create a new Position bound representing "no bound"
  static PositionBound unbounded();

  // Create a new, empty Position
  static PositionBound at(Position position);

  // Check whether this is an "unbounded" Position bound
  boolean isUnbounded();

  // Get the Position (if it's not unbounded or latest)
  Position position();
public class PositionBound {

  // Create a new Position bound representing "no bound"
  static PositionBound unbounded();

  // Create a new, empty Position
  static PositionBound at(Position position);

  // Check whether this is an "unbounded" Position bound
  boolean isUnbounded();

  // Get the Position (if it's not unbounded or latest)
  Position position();


To support persistence of position information across restarts (for persistent state stores, which won't read the changelog), we need to add a mechanism for the store to be notified when Streams is in a consistent state (after commit and before processing). This operation is purely a contract between the persistent, inner state store and the {Global,Processor}StateManager . No other components need to invoke that operation, and no components need to invoke it on the so registering a callback as part of register  is better than

Code Block
interface StateStoreContext {


     void register(final StateStore store,
                  final StateRestoreCallback stateRestoreCallback)


     * Registers and possibly restores the specified storage engine.
     * @param store the storage engine
     * @param stateRestoreCallback the restoration callback logic for log-backed state stores upon restart
     * @param commitCallback a callback to be invoked upon successful task commit, in case the store
     *                           needs to perform any state tracking when the task is known to be in
     *                           a consistent state. If the store has no such state to track, it may
     *                           use {@link StateStoreContext#register(StateStore, StateRestoreCallback)} instead.
     *                           Persistent stores provided by Kafka Streams use this method to save
     *                           their Position information to local disk, for example.
     * @throws IllegalStateException If store gets registered after initialized is already finished
     * @throws StreamsException if the store's change log does not contain the partition
    void register(final StateStore store,
                  final StateRestoreCallback stateRestoreCallback,
                  final CommitCallback commitCallback);

Compatibility, Deprecation, and Migration Plan
