Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Vote thread: https://lists.apache.org/thread/jmfqp5rycbcztn1ksvn4b7c9p34xgftk

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-13479
TBD

POC PR: https://github.com/apache/kafka/pull/11406

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

KIP Tracking

To control the scope of this long-term project, we chose to defer much of the details to later KIPs. Here's a roundup of the related work:

Motivation

Kafka Streams supports an interesting and innovative API for "peeking" into the internal state of running stateful stream processors from outside of the application, called Interactive Query (IQ). This functionality has proven invaluable to users over the years for everything from debugging running applications to serving low latency queries straight from the Streams runtime.

...

Code Block
/**
  * The request object for Interactive Queries.
  * This is an immutable builder class for passing all required and
  * optional arguments for querying a state store in Kafka Streams.
  * <p>
  * @param <R> The type of the query result.
  */
@Evolving
public class StateQueryRequest<R> {

  /**
    * First required argument to specify the name of the store to query
    */
  public static InStore inStore(final String name);

  public static class InStore {

    /**
      * Second required argument to provide the query to execute.
      */
    public <R> StateQueryRequest<R> withQuery(final Query<R> query);
  } 

  /**
    * Optionally bound the current position of the state store
    * with respect to the input topics that feed it. In conjunction
    * with {@link StateQueryResult#getPosition}, this can be
    * used to achieve a good balance between consistency and
    * availability in which repeated queries are guaranteed to
    * advance in time while allowing reads to be served from any
    * replica that is caught up to that caller's prior observations.
    * <p>
    * Note that the set of offsets provided in the bound does not determine
    * the partitions to query. For that, see {@link withPartitionsToQuery}.
    * Unrelated offsets will be ignored, and missing offsets will be treated
    * as indicating "no bound". 
    */
  public StateQueryRequest<R> withPositionBound(PositionBound positionBound);

  /**
    * OptionallySpecifies specifythat thethis partitionsquery toshould includeonly inrun theon query.
partitions for which this *instance Ifis omitted, the defaultleader
 is to query all* locally available partitions
    */
  public StateQueryRequest<R> withPartitions(Set<Integer> partitions);

  /**(aka "active"). Partitions for which this instance is not the active replica will return
    * Query all locally available partitions{@link FailureReason#NOT_ACTIVE}.
    */
  public StateQueryRequest<R> withAllPartitionsrequireActive(); 


   /**
    * Instruct StreamsOptionally specify the partitions to collectinclude detailedin informationthe during query.
    * executionIf omitted, forthe example,default which stores handled theis to query, how
all locally   * long they took, etc.available partitions
    */
  public public StateQueryRequest<R> enableExecutionInfowithPartitions(Set<Integer> partitions);

   //**
 Getters are also proposed* toQuery retrieveall thelocally requestavailable parameterspartitions

  public String getStoreName(); */
  public Query<R>StateQueryRequest<R> getQuerywithAllPartitions();
  public PositionBound getPositionBound();
  public boolean executionInfoEnabled() 

  /**
      * emptyInstruct setStreams indicatesto thatcollect nodetailed partitionsinformation willduring be fetchedquery
    * non-empty set indicate the specific partitions that will be fetched (if locally available) execution, for example, which stores handled the query, how
    * long they took, etc.
    * throws UnsupportedOperationException if the request is to all partitions (isAllPartitions() == true)
    *//
  public StateQueryRequest<R> enableExecutionInfo();

  // Getters are also proposed to retrieve the request parameters

  public Set<Integer>String getPartitionsgetStoreName();

  /**
public Query<R> getQuery();
  *public indicates that all locally available partitions will be fetched
    */
  public boolean isAllPartitions();

}

StateQueryResult

This is the main response object for IQv2. It wraps the individual results, as well as providing a vehicle to deliver metadata relating to the result as a whole.

Code Block
PositionBound getPositionBound();
  public boolean executionInfoEnabled() 

  /**
    * Theempty responseset objectindicates forthat interactiveno queries.
partitions will *be Itfetched
 wraps the individual results, as well as providing a
  * vehicle to deliver metadata relating to the result as a whole.
  * <p>
  * @param <R> The type of the query result. 
  */
@Evolving 
public class StateQueryResult<R> {* non-empty set indicate the specific partitions that will be fetched (if locally available)
    * throws UnsupportedOperationException if the request is to all partitions (isAllPartitions() == true)
    */
  public Set<Integer> getPartitions();

  /**
    * Constructor.indicates Usedthat byall Kafkalocally Streams,available andpartitions maywill be useful forfetched
    */
 tests aspublic boolean isAllPartitions();

}

StateQueryResult

This is the main response object for IQv2. It wraps the individual results, as well as providing a vehicle to deliver metadata relating to the result as a whole.

Code Block
well.
    */
  public StateQueryResult(Map<Integer /*partition*/, QueryResult<R>> partitionResults);

  /**
    * The query'sresponse resultobject for each partition that executedinteractive queries.
  * It wraps the query.
individual results, as well */
as  public Map<Integer /*partition*/, QueryResult<R>> getPartitionResults();
providing a
  /**
 vehicle to deliver *metadata Assertsrelating thatto onlythe oneresult partition returnsas a resultwhole.
 and extract the result.* <p>
    * Useful@param with<R> queriesThe thattype expectof athe singlequery result. 
    */
@Evolving  public QueryResult<R> getOnlyPartitionResult()
public class StateQueryResult<R> {


  /**
    * TheSet the positionresult offor thea stateglobal store atquery. theUsed momentby itKafka executedStreams the
and available for  * query. In conjunctiontests.
    */
 with {@link StateQueryRequest#withPartitionBound}, this can bepublic void setGlobalResult(final QueryResult<R> r);

  /**
  * used to* achieveThe aquery's goodresult balancefor betweenglobal consistencystore and
queries. Is {@code null} *for availability in which repeated queries are guaranteed tonon-global (partitioned)
    * store queries.
    */
 advance inpublic time while allowing reads to be served from anyQueryResult<R> getGlobalResult();

  /**
    * Set the result for a partitioned store query. Used by Kafka Streams and available for tests.
    * replica that is caught up to that caller's prior observations/
  public void addResult(final int partition, final QueryResult<R> r);

  /**
    * The query's result for each partition that executed the query.
    */ 
  public Position getPosition();
}

StateStore modifications

This is the essence of the proposal. Rather than modifying each store interface to allow new kinds of queries, we introduce a generic capability of stores to execute query objects. This allows stores to choose whether they accept or reject queries of a given type, the introduction of new queries, etc.

Note that we are proposing a method with multiple arguments instead of a "parameters" object because this is an interface that users should implement to provide their own state stores. Following the strategy of adding/deprecating new method overloads allows us to modify this API over time by adding new methods with default  implementations and control what happens in the default case (falling back to another method or returning an error), whereas if we had a "parameters" object, Streams would have no control or knowledge over whether stores use the new members or not, which makes API evolution more difficult to manage.

Code Block
public interface StateStore {
  ...

  /**
    * Execute a query. Returns a QueryResult containing either result data or
    * a failure. Map<Integer /*partition*/, QueryResult<R>> getPartitionResults();

  /**
    * Asserts that only one partition returns a result and extract the result.
    * Useful with queries that expect a single result.
    */
  public QueryResult<R> getOnlyPartitionResult()


  /**
    * The position of the state store at the moment it executed the
    * query. In conjunction
    * with {@link StateQueryRequest#withPartitionBound}, this can be
    * used to achieve a good balance between consistency and
    * availability in which repeated queries are guaranteed to
    * <p>
advance in time while *allowing Ifreads theto storebe doesn'tserved knowfrom howany
 to handle the given* query,replica thethat result
is caught up to *that willcaller's be a {@link FailureReason#UNKNOWN_QUERY_TYPE}prior observations.
    */ If
 the public store couldn't satisfy the given position bound, the result
    * will be a {@link FailureReason#NOT_UP_TO_BOUND}.
    * @param query The query to execute
    * @param offsetBound The position the store must be at or past
    * @param collectExecutionInfo Whether the store should collect detailed execution info for the query
    * @param <R> The result type
    */
  @Evolving
  default <R> QueryResult<R> query(Query<R> query,
                                   PositionBound positionBound,
                                   boolean collectExecutionInfo) {
    // If a store doesn't implement a query handler, then all queries are unknown.
    return QueryResult.forUnknownQueryType(query, this);
  }
 
  ...
}

Query

This is the interface that all queries must implement. Part of what I'm going for here is to place as few restrictions as possible on what a "query" or its "result" is, so the only thing in this interface is a generic type indicating the result type, which lets Streams return a fully typed response without predetermining the response type itself.

Code Block
@Evolving
public interface Query<R> { }

Using this interface, store implementers can create any query they can think of and return any type they like, from a single value to an iterator, even a future. While writing the POC, I implemented three queries for KeyValue stores, which I'll include here as examples:

Proposed Query: KeyQuery

This query implements the functionality of the current KeyValueStore#get(key)  method:

Position getPosition();
}

StateStore modifications

This is the essence of the proposal. Rather than modifying each store interface to allow new kinds of queries, we introduce a generic capability of stores to execute query objects. This allows stores to choose whether they accept or reject queries of a given type, the introduction of new queries, etc.

Note that we are proposing a method with multiple arguments instead of a "parameters" object because this is an interface that users should implement to provide their own state stores. Following the strategy of adding/deprecating new method overloads allows us to modify this API over time by adding new methods with default  implementations and control what happens in the default case (falling back to another method or returning an error), whereas if we had a "parameters" object, Streams would have no control or knowledge over whether stores use the new members or not, which makes API evolution more difficult to manage.

Code Block
public interface StateStore {
  ...

  /**
    * Execute a query. Returns a QueryResult containing either result data or
    * a failure.
    * <p>
    * If the store doesn't know how to handle the given query, the result
    * will be a {@link FailureReason#UNKNOWN_QUERY_TYPE}.
    * If the store couldn't satisfy the given position bound, the result
    * will be a {@link FailureReason#NOT_UP_TO_BOUND}.
    * @param query The query to execute
    * @param offsetBound The position the store must be at or past
    * @param collectExecutionInfo Whether the store should collect detailed execution info for the query
    * @param <R> The result type
    */
  @Evolving
  default <R> QueryResult<R> query(Query<R> query,
                                   PositionBound positionBound,
                                   boolean collectExecutionInfo) {
    // If a store doesn't implement a query handler, then all queries are unknown.
    return QueryResult.forUnknownQueryType(query, this);
  }

    /**
     * Returns the position the state store is at with respect to the input topic/partitions
     */
    @Evolving
    default Position getPosition() {
        throw new UnsupportedOperationException(
            "getPosition is not implemented by this StateStore (" + getClass() + ")"
        );
    }

     ...
}

Query

This is the interface that all queries must implement. Part of what I'm going for here is to place as few restrictions as possible on what a "query" or its "result" is, so the only thing in this interface is a generic type indicating the result type, which lets Streams return a fully typed response without predetermining the response type itself.

Code Block
@Evolving
public interface Query<R> { }

Using this interface, store implementers can create any query they can think of and return any type they like, from a single value to an iterator, even a future. While writing the POC, I implemented three queries for KeyValue stores, which I'll include here as examples:

Proposed Query: KeyQuery

This query implements the functionality of the current KeyValueStore#get(key)  method:

Code Block
@Evolving
public class KeyQuery<K, V> implements Query<V> {
  // static factory to create a new KeyQuery, given a key
  public static <K, V> KeyQuery<K, V> withKey(final K key);

  // getter for the key
  public K getKey();
}

// ===
Code Block
@Evolving
public class KeyQuery<K, V> implements Query<V> {
  // static factory to create a new KeyQuery, given a key
  public static <K, V> KeyQuery<K, V> withKey(final K key);

  // getter for the key
  public K getKey();
}

// ======================================
// example usage in IQv2:

Integer key = 1;

// note that "mystore" is a KeyValueStore<Integer, ValueAndTimestamp<Integer>>,
// hence the result type        
StateQueryRequest<ValueAndTimestamp<Integer>> query =
  inStore("mystore")
    .withQuery(KeyQuery.withKey(key));
     
// run the query
StateQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(query);

// In this example, it doesn't matter which partition ran the query, so we just
// grab the result from the partition that returned something
// (this is what IQ currently does under the covers).

Integer value = result.getOnlyPartitionResult().getResult().value();

// ===================================================
// Forexample comparison,usage here is how the query would look in current IQ:
// (note, this glosses over many of the problems described above.in IQv2:

Integer key = 1;

// note that "mystore" is a KeyValueStore<Integer, ValueAndTimestamp<Integer>>,
// hence the exampleresult istype here to illustrate the different ergonomics
//  of the two APIs.)

// create the
StateQueryRequest<ValueAndTimestamp<Integer>> query parameters
StoreQueryParameters<ReadOnlyKeyValueStore<Integer, ValueAndTimestamp<Integer>>> storeQueryParameters =
  StoreQueryParameters.fromNameAndType(
    inStore("mystore", )
    QueryableStoreTypes.timestampedKeyValueStore()
  );

// get a store handle to.withQuery(KeyQuery.withKey(key));
     
// run the query
ReadOnlyKeyValueStore<Integer, ValueAndTimestamp<Integer>> storeStateQueryResult<ValueAndTimestamp<Integer>> result =
  kafkaStreams.storequery(storeQueryParametersquery);

// In this example, it doesn't matter which partition ran the query, the storeso we just
// grab the result from the partition that returned something
// (this is what IQ currently does under the covers).

Integer value1value = store.get(keyresult.getOnlyPartitionResult().getResult().value(); 

We propose to add KeyQuery as part of the implementation of this KIP, in order to have at least one query available to flesh out tests, etc., for the framework. Other queries are deferred to later KIPs.

Proposed Query: RawKeyQuery

We also propose to add a "raw" version of the KeyQuery, which simply takes the key as a byte array and returns the value as a byte array. This will be used to implement the KeyQuery, since in Streams the Metered store layers would "terminate" the typed KeyQuery by invoking the serializer on the key and then to execute a RawKeyQuery on the lower layers with the serialized key. Those lower layers would return a serialized value, which the Metered store would deserialize and (using QueryResult#swapValue ) convert it to a typed vale to return for the KeyQuery's result.

Having a separate query type is nice for StateStore implementations is nice because they would otherwise have to assume that if they receive a KeyQuery<K,V>  it would always in practice be a KeyQuery<Bytes,byte[]> or similar. If we wanted to migrate (eg) from Bytes  to byte[]  or ByteBuffer  for the key, there wouldn't really be a good way to do it with generics, but with a defined type, we could add and deprecate methods to get a migration path.

The same RawKeyQuery is well defined for IQv2 callers to execute, which allows them to bypass the serialization logic in the Metered stores entirely, potentially saving valuable CPU time and memory.

Code Block
@Evolving
public class RawKeyQuery implements Query<byte[]> {
  public static RawKeyQuery withKey(final Bytes key);
  public static RawKeyQuery withKey(final byte[] key);
  public Bytes getKey();
}

Example query: RawScanQuery

This example demonstrates two variations on the first example:

  1. The ability to define queries handling "raw" binary data for keys and values
  2. The ability for a query to return multiple results (in this case, an iterator)

I'm only bundling those for brevity. We can also have typed, iterable queries and raw single-record queries.

Note this query is purely an example of what is possible. It will not be added as part of this KIP.

Code Block
public class RawScanQuery implements Query<KeyValueIterator<Bytes, byte[]>> {
    private RawScanQuery() { }

    public static RawScanQuery scan() {
        return new RawScanQuery();
    }
}

// example usage

// since we're handling raw data, we're going to need the serdes
// this is just an example, this method is also not proposed in this KIP.
StateQuerySerdes<Integer, ValueAndTimestamp<Integer>> serdes =
  kafkaStreams.serdesForStore("mystore");

// run the "scan" query
StateQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
  kafkaStreams.query(inStore("mystore").withQuery(RawScanQuery.scan()));

// This time, we'll get results from all locally available partitions.
Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =
  scanResult.getPartitionResults();


// for this example, we'll just collate all the partitions' iterators
// together and print their data

List<KeyValueIterator<Bytes, byte[]>> iterators =
  partitionResults
    .values()
    .stream()
    .map(QueryResult::getResult)
    .collect(Collectors.toList());

// Using an example convenience method we could add to collate iterators' data
try (CloseableIterator<KeyValue<Bytes, byte[]>> collated = Iterators.collate(collect)) {
  while(collate.hasNext()) {
    KeyValue<Bytes, byte[]> next = collated.next();
    System.out.println(
      "|||" +
        " " + serdes.keyFrom(next.key.get()) +
        " " + serdes.valueFrom(next.value)
    );
  }
}

QueryResult

This is a container for a single partition's query result.

Code Block
@Evolving
public class QueryResult<R> {
  // wraps a successful result
  public static <R> QueryResult<R> forResult(R result);

  // wraps a failure result
  public static <R> QueryResult<R> forFailure(FailureReason failureReason, String failureMessage); 

  // returns a failed query result because the store didn't know how to handle the query.
  public static <R> QueryResult<R> forUnknownQueryType(Query<R> query, StateStore store);

  // returns a failed query result because the partition wasn't caught up to the desired bound.
  public static <R> QueryResult<R> notUpToBound(Position currentPosition, Position bound);

  // Used by state stores that need to delegate to another store to run a query and then
  // translate the results. Does not change the execution info or any other metadata.
  public <NewR> QueryResult<NewR> swapResult(NewR newTypedResult);

  // If requested, stores should record
  // helpful information, such as their own class, how they executed the query,
  // and the time they took.
  public void addExecutionInfo(String executionInfo);

  // records the point in the store's history that executed the query.
  public void setPosition(Position position);

  public boolean isSuccess();
  public boolean isFailure();
  public List<String> getExecutionInfo();
  public Position getPosition();
  public FailureReason getFailureReason();
  public String getFailureMessage();
  public getResult();
}

FailureReason

An enum classifying failures for individual partitions' failures

Code Block
public enum FailureReason {
    /**
     * Failure indicating that the store doesn't know how to handle the given query.
     */
    UNKNOWN_QUERY_TYPE,

    /**
     * Failure indicating that the store partition is not (yet) up to the desired bound.
     * The caller should either try again later or try a different replica.
     */
    NOT_UP_TO_BOUND,

    /**
     * Failure indicating that the requested store partition is not present on the local
     * KafkaStreams instance. It may have been migrated to another instance during a rebalance.
     * The caller is recommended to try a different replica.
     */
    NOT_PRESENT,

    /**
     * The requested store partition does not exist at all. For example, partition 4 was requested,
     * but the store in question only has 4 partitions (0 through 3).
     */
    DOES_NOT_EXIST;
}

Position

A class representing a processing state position in terms of its inputs: a vector or (topic, partition, offset) components.

Code Block
@Evolving
public interface Position {

  // Create a new Position from a map of topic -> partition -> offset
  static Position fromMap(Map<String, Map<Integer, Long>> map);

  // Create a new, empty Position
  static Position emptyPosition();

  // Return a new position based on the current one, with the given component added
  Position withComponent(String topic, int partition, long offset);

  // Merge all the components of this position with all the components of the other
  // position and return the result in a new Position
  Position merge(Position other);

  // Get the set of topics included in this Position
  Set<String> getTopics();

  // Given a topic, get the partition -> offset pairs included in this Position
  Map<Integer, Long> getBound(String topic);
}

PositionBound

A class bounding the processing state Position during queries. This can be used to specify that a query should fail if the locally available partition isn't caught up to the specified bound. "Unbounded" places no restrictions on the current location of the partition, and "latest" indicates that only a running active replica should reply to the query.



// ======================================
// For comparison, here is how the query would look in current IQ:
// (note, this glosses over many of the problems described above.
//  the example is here to illustrate the different ergonomics
//  of the two APIs.)

// create the query parameters
StoreQueryParameters<ReadOnlyKeyValueStore<Integer, ValueAndTimestamp<Integer>>> storeQueryParameters =
  StoreQueryParameters.fromNameAndType(
    "mystore", 
    QueryableStoreTypes.timestampedKeyValueStore()
  );

// get a store handle to run the query
ReadOnlyKeyValueStore<Integer, ValueAndTimestamp<Integer>> store =
  kafkaStreams.store(storeQueryParameters);

// query the store
Integer value1 = store.get(key).value(); 


We propose to add KeyQuery as part of the implementation of this KIP, in order to have at least one query available to flesh out tests, etc., for the framework. Other queries are deferred to later KIPs.

Example Query: RawKeyQuery

We could later propose to add a "raw" version of the KeyQuery, which simply takes the key as a byte array and returns the value as a byte array. This could allow callers to bypass the serialization logic in the Metered stores entirely, potentially saving valuable CPU time and memory.

Code Block
@Evolving
public class RawKeyQuery implements Query<byte[]> {
  public static RawKeyQuery withKey(final Bytes key);
  public static RawKeyQuery withKey(final byte[] key);
  public Bytes getKey();
}


Example query: RawScanQuery

This example demonstrates two variations on the first example:

  1. The ability to define queries handling "raw" binary data for keys and values
  2. The ability for a query to return multiple results (in this case, an iterator)

I'm only bundling those for brevity. We can also have typed, iterable queries and raw single-record queries.

Note this query is purely an example of what is possible. It will not be added as part of this KIP.

Code Block
public class RawScanQuery implements Query<KeyValueIterator<Bytes, byte[]>> {
    private RawScanQuery() { }

    public static RawScanQuery scan() {
        return new RawScanQuery();
    }
}

// example usage

// since we're handling raw data, we're going to need the serdes
// this is just an example, this method is also not proposed in this KIP.
StateQuerySerdes<Integer, ValueAndTimestamp<Integer>> serdes =
  kafkaStreams.serdesForStore("mystore");

// run the "scan" query
StateQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
  kafkaStreams.query(inStore("mystore").withQuery(RawScanQuery.scan()));

// This time, we'll get results from all locally available partitions.
Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =
  scanResult.getPartitionResults();


// for this example, we'll just collate all the partitions' iterators
// together and print their data

List<KeyValueIterator<Bytes, byte[]>> iterators =
  partitionResults
    .values()
    .stream()
    .map(QueryResult::getResult)
    .collect(Collectors.toList());

// Using an example convenience method we could add to collate iterators' data
try (CloseableIterator<KeyValue<Bytes, byte[]>> collated = Iterators.collate(collect)) {
  while(collate.hasNext()) {
    KeyValue<Bytes, byte[]> next = collated.next();
    System.out.println(
      "|||" +
        " " + serdes.keyFrom(next.key.get()) +
        " " + serdes.valueFrom(next.value)
    );
  }
}

QueryResult

This is a container for a single partition's query result.

Code Block
@Evolving
public class QueryResult<R> {
  // wraps a successful result
  public static <R> QueryResult<R> forResult(R result);

  // wraps a failure result
  public static <R> QueryResult<R> forFailure(FailureReason failureReason, String failureMessage); 

  // returns a failed query result because the store didn't know how to handle the query.
  public static <R> QueryResult<R> forUnknownQueryType(Query<R> query, StateStore store);

  // returns a failed query result because the partition wasn't caught up to the desired bound.
  public static <R> QueryResult<R> notUpToBound(Position currentPosition, Position bound);

  // Used by state stores that need to delegate to another store to run a query and then
  // translate the results. Does not change the execution info or any other metadata.
  public <NewR> QueryResult<NewR> swapResult(NewR newTypedResult);

  // If requested, stores should record
  // helpful information, such as their own class, how they executed the query,
  // and the time they took.
  public void addExecutionInfo(String executionInfo);

  // records the point in the store's history that executed the query.
  public void setPosition(Position position);

  public boolean isSuccess();
  public boolean isFailure();
  public List<String> getExecutionInfo();
  public Position getPosition();
  public FailureReason getFailureReason();
  public String getFailureMessage();
  public getResult();
}

FailureReason

An enum classifying failures for individual partitions' failures

Code Block
public enum FailureReason {
    /**
     * Failure indicating that the store doesn't know how to handle the given query.
     */
    UNKNOWN_QUERY_TYPE,

    /**
     * The query required to execute on an active task (via {@link StateQueryRequest#requireActive()}),
     * but while executing the query, the task was either a Standby task, or it was an Active task
     * not in the RUNNING state. The failure message will contain the reason for the failure.
     * <p>
     * The caller should either try again later or try a different replica.
     */
    NOT_ACTIVE,

    /**
     * Failure indicating that the store partition is not (yet) up to the desired bound.
     * The caller should either try again later or try a different replica.
     */
    NOT_UP_TO_BOUND,

    /**
     * Failure indicating that the requested store partition is not present on the local
     * KafkaStreams instance. It may have been migrated to another instance during a rebalance.
     * The caller is recommended to try a different replica.
     */
    NOT_PRESENT,

    /**
     * The requested store partition does not exist at all. For example, partition 4 was requested,
     * but the store in question only has 4 partitions (0 through 3).
     */
    DOES_NOT_EXIST;

    /**
     * The store that handled the query got an exception during query execution. The message
     * will contain the exception details. Depending on the nature of the exception, the caller
     * may be able to retry this instance or may need to try a different instance.
     */
    STORE_EXCEPTION; }


Position

A class representing a processing state position in terms of its inputs: a vector or (topic, partition, offset) components.

Code Block
@Evolving
public class Position {

  // Create a new Position from a map of topic -> partition -> offset
  static Position fromMap(Map<String, Map<Integer, Long>> map);

  // Create a new, empty Position
  static Position emptyPosition();

  // Return a new position based on the current one, with the given component added
  Position withComponent(String topic, int partition, long offset);

  // Merge all the components of this position with all the components of the other
  // position and return the result in a new Position
  Position merge(Position other);

  // Get the set of topics included in this Position
  Set<String> getTopics();

  // Given a topic, get the partition -> offset pairs included in this Position
  Map<Integer, Long> getBound(String topic);
}

PositionBound

A class bounding the processing state Position during queries. This can be used to specify that a query should fail if the locally available partition isn't caught up to the specified bound. "Unbounded" places no restrictions on the current location of the partition.

EDIT: The KIP initially proposed to have a "latest" bound, which would require the query to run on a RUNNING Active task, but that has been moved to StateQueryRequest#requireActive. This more cleanly separates responsibilities, since StateStores are responsible for enforcing the PositionBound and Kafka Streams is responsible for handling the other details of StateQueryRequest. The problem was that the store actually doesn't have visibility into the state or type of task that contains it, so it was unable to take responsibility for the "latest" bound. It's important in particular to keep the StateStore's responsibilities clear because that is an extension point for Kafka Streams users who implement their own stores.

Code Block
@Evolving
public class PositionBound {

  // Create a new Position bound representing "no bound"
  static PositionBound unbounded();

  // Create a new, empty Position
  static PositionBound at(Position position);

  // Check whether this is an "unbounded" Position bound
  boolean isUnbounded();

  // Get the Position (if it's not unbounded or latest)
  Position position();
}

StateStoreContext

To support persistence of position information across restarts (for persistent state stores, which won't read the changelog), we need to add a mechanism for the store to be notified when Streams is in a consistent state (after commit and before processing). This operation is purely a contract between the persistent, inner state store and the {Global,Processor}StateManager . No other components need to invoke that operation, and no components need to invoke it on the so registering a callback as part of register  is better than

Code Block
interface StateStoreContext {
...

// UNCHANGED EXISTING METHOD FOR REFERENCE:

     void register(final StateStore store,
                  final StateRestoreCallback stateRestoreCallback)

// NEW METHOD:


   /**
     * Registers and possibly restores the specified storage engine.
     *
     * @param store the storage engine
     * @param stateRestoreCallback the restoration callback logic for log-backed state stores upon restart
     * @param commitCallback a callback to be invoked upon successful task commit, in case the store
     *                           needs to perform any state tracking when the task is known to be in
     *                           a consistent state. If the store has no such state to track, it may
     *                           use {@link StateStoreContext#register(StateStore, StateRestoreCallback)} instead.
     *                           Persistent stores provided by Kafka Streams use this method to save
     *                           their Position information to local disk, for example.
     *
     * @throws IllegalStateException If store gets registered after initialized is already finished
     * @throws StreamsException if the store's change log does not contain the partition
     */
    @Evolving
    void register(final StateStore store,
                  final StateRestoreCallback stateRestoreCallback,
                  final CommitCallback commitCallback);
...
Code Block
@Evolving
public interface PositionBound {

  // Create a new Position bound representing only the latest state
  static PositionBound latest();

  // Create a new Position bound representing "no bound"
  static PositionBound unbounded();

  // Create a new, empty Position
  static PositionBound at(Position position);

  // Check whether this is a "latest" Position bound
  boolean isLatest();

  // Check whether this is an "unbounded" Position bound
  boolean isUnbounded();

  // Get the Position (if it's not unbounded or latest)
  Position position();
}


Compatibility, Deprecation, and Migration Plan

...

There have been some problems noted with the metadata APIs that support IQ, such as queryMetadataForKey, streamsMetadataForStore, and allLocalStorePartitionLags. Updating those APIs would be valuable, but to control the scope of this proposal, we choose to treat that as future work.

Add RawKeyQuery

I originally proposed to use KeyQuery for the typed query/result and have a separate class, RawKeyQuery, for the serialized query/result that's handled by the lower store layers. During the proposal, I though this would be simpler, but during implementation and also while proposing other queries, it became apparent that it was actually more complicated. Therefore, RawKeyQuery is now only presented as an example and not actually proposed in this KIP.