Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Fixed "require active" API

...

Code Block
/**
  * The request object for Interactive Queries.
  * This is an immutable builder class for passing all required and
  * optional arguments for querying a state store in Kafka Streams.
  * <p>
  * @param <R> The type of the query result.
  */
@Evolving
public class StateQueryRequest<R> {

  /**
    * First required argument to specify the name of the store to query
    */
  public static InStore inStore(final String name);

  public static class InStore {

    /**
      * Second required argument to provide the query to execute.
      */
    public <R> StateQueryRequest<R> withQuery(final Query<R> query);
  } 

  /**
    * Optionally bound the current position of the state store
    * with respect to the input topics that feed it. In conjunction
    * with {@link StateQueryResult#getPosition}, this can be
    * used to achieve a good balance between consistency and
    * availability in which repeated queries are guaranteed to
    * advance in time while allowing reads to be served from any
    * replica that is caught up to that caller's prior observations.
    * <p>
    * Note that the set of offsets provided in the bound does not determine
    * the partitions to query. For that, see {@link withPartitionsToQuery}.
    * Unrelated offsets will be ignored, and missing offsets will be treated
    * as indicating "no bound". 
    */
  public StateQueryRequest<R> withPositionBound(PositionBound positionBound);

  /**
    * OptionallySpecifies specifythat thethis partitionsquery toshould includeonly inrun theon query.
partitions for which this *instance Ifis omitted, the defaultleader
 is to query all* locally available partitions
    */
  public StateQueryRequest<R> withPartitions(Set<Integer> partitions);

  /**(aka "active"). Partitions for which this instance is not the active replica will return
    * Query all locally available partitions{@link FailureReason#NOT_ACTIVE}.
    */
  public StateQueryRequest<R> withAllPartitionsrequireActive(); 


   /**
    * InstructOptionally Streamsspecify tothe collectpartitions to detailedinclude informationin duringthe query.
    * execution, for exampleIf omitted, whichthe storesdefault handledis theto query, how
all locally   * long they took, etc.available partitions
    */
  public public StateQueryRequest<R> enableExecutionInfowithPartitions(Set<Integer> partitions);

   //**
 Getters are also proposed* toQuery retrieveall thelocally requestavailable parameterspartitions

  public String getStoreName(); */
  public Query<R>StateQueryRequest<R> getQuerywithAllPartitions();
  public PositionBound getPositionBound();
  public boolean executionInfoEnabled() 

  /**
      * emptyInstruct setStreams indicatesto thatcollect nodetailed partitionsinformation willduring be fetchedquery
    * non-empty set indicate the specific partitions that will be fetched (if locally available)execution, for example, which stores handled the query, how
    * throwslong UnsupportedOperationExceptionthey if the request is to all partitions (isAllPartitions() == true)
   took, etc.
    */
  public publicStateQueryRequest<R> Set<Integer> getPartitionsenableExecutionInfo();

   //**
 Getters are also *proposed indicatesto thatretrieve allthe locallyrequest availableparameters

 partitions willpublic be fetchedString getStoreName();
  public  */Query<R> getQuery();
  public PositionBound getPositionBound();
  public boolean isAllPartitionsexecutionInfoEnabled();

}

StateQueryResult

This is the main response object for IQv2. It wraps the individual results, as well as providing a vehicle to deliver metadata relating to the result as a whole.

Code Block
 

  /**
    * Theempty responseset objectindicates forthat interactiveno queries.
partitions will *be Itfetched
 wraps the individual results, as well as providing a
  * vehicle to deliver metadata relating to the result as a whole.
  * <p>
  * @param <R> The type of the query result. 
  */
@Evolving 
public class StateQueryResult<R> {* non-empty set indicate the specific partitions that will be fetched (if locally available)
    * throws UnsupportedOperationException if the request is to all partitions (isAllPartitions() == true)
    */
  public Set<Integer> getPartitions();

  /**
    * Constructor.indicates Usedthat byall Kafkalocally Streams,available andpartitions maywill be useful forfetched
    * tests as well.
    */
  public boolean StateQueryResult(Map<Integer /*partition*/, QueryResult<R>> partitionResultsisAllPartitions();

}

StateQueryResult

This is the main response object for IQv2. It wraps the individual results, as well as providing a vehicle to deliver metadata relating to the result as a whole.

Code Block
  /**
    * The query'sresponse resultobject for eachinteractive partitionqueries.
 that executed* theIt query.
wraps the   */
  public Map<Integer /*partition*/, QueryResult<R>> getPartitionResults();
individual results, as well as providing a
  /**
 vehicle to deliver *metadata Assertsrelating thatto onlythe oneresult partition returnsas a resultwhole.
 and extract the result.* <p>
    * Useful@param with<R> queriesThe thattype expectof athe singlequery result. 
    */
@Evolving 
public  publicclass QueryResult<R> getOnlyPartitionResult()

StateQueryResult<R> {
  /**
    * TheConstructor. positionUsed ofby theKafka stateStreams, storeand atmay thebe moment it executed theuseful for
    * query.tests Inas conjunctionwell.
    */
 with {@link StateQueryRequest#withPartitionBound}, this can be public StateQueryResult(Map<Integer /*partition*/, QueryResult<R>> partitionResults);

  /**
    * The usedquery's toresult achievefor aeach goodpartition balancethat betweenexecuted consistencythe andquery.
    */
 availability public in which repeated queries are guaranteed toMap<Integer /*partition*/, QueryResult<R>> getPartitionResults();

  /**
    * advanceAsserts inthat timeonly one whilepartition allowingreturns readsa toresult beand servedextract fromthe anyresult.
    * replicaUseful thatwith isqueries caughtthat upexpect toa that caller's prior observationssingle result.
    */ 
  public PositionQueryResult<R> getPositiongetOnlyPartitionResult();
}

StateStore modifications

This is the essence of the proposal. Rather than modifying each store interface to allow new kinds of queries, we introduce a generic capability of stores to execute query objects. This allows stores to choose whether they accept or reject queries of a given type, the introduction of new queries, etc.

Note that we are proposing a method with multiple arguments instead of a "parameters" object because this is an interface that users should implement to provide their own state stores. Following the strategy of adding/deprecating new method overloads allows us to modify this API over time by adding new methods with default  implementations and control what happens in the default case (falling back to another method or returning an error), whereas if we had a "parameters" object, Streams would have no control or knowledge over whether stores use the new members or not, which makes API evolution more difficult to manage.




  /**
    * The position of the state store at the moment it executed the
    * query. In conjunction
    * with {@link StateQueryRequest#withPartitionBound}, this can be
    * used to achieve a good balance between consistency and
    * availability in which repeated queries are guaranteed to
    * advance in time while allowing reads to be served from any
    * replica that is caught up to that caller's prior observations.
    */ 
  public Position getPosition();
}

StateStore modifications

This is the essence of the proposal. Rather than modifying each store interface to allow new kinds of queries, we introduce a generic capability of stores to execute query objects. This allows stores to choose whether they accept or reject queries of a given type, the introduction of new queries, etc.

Note that we are proposing a method with multiple arguments instead of a "parameters" object because this is an interface that users should implement to provide their own state stores. Following the strategy of adding/deprecating new method overloads allows us to modify this API over time by adding new methods with default  implementations and control what happens in the default case (falling back to another method or returning an error), whereas if we had a "parameters" object, Streams would have no control or knowledge over whether stores use the new members or not, which makes API evolution more difficult to manage.

Code Block
public interface StateStore {
  ...

  /**
    * Execute a query. Returns a QueryResult containing either result data or
    * a failure.
    * <p>
    * If the 
Code Block
public interface StateStore {
  ...

  /**
    * Execute a query. Returns a QueryResult containing either result data or
    * a failure.
    * <p>
    * If the store doesn't know how to handle the given query, the result
    * will be a {@link FailureReason#UNKNOWN_QUERY_TYPE}.
    * If the store couldn't satisfy the given position bound, the result
    * will be a {@link FailureReason#NOT_UP_TO_BOUND}.
    * @param query The query to execute
    * @param offsetBound The position the store must be at or past
    * @param collectExecutionInfo Whether the store should collect detailed execution info for the query
    * @param <R> The result type
    */
  @Evolving
  default <R> QueryResult<R> query(Query<R> query,
                                   PositionBound positionBound,
                                   boolean collectExecutionInfo) {
    // If a store doesn't implement a query handler, then all queries are unknown.
    return QueryResult.forUnknownQueryType(query, this);
  }
 
  ...
}

...

The same RawKeyQuery is well defined for IQv2 callers to execute, which allows them to bypass the serialization logic in the Metered stores entirely, potentially saving valuable CPU time and memory.

Code Block
@Evolving
public class RawKeyQuery implements Query<byte[]> {
  public static RawKeyQuery withKey(final Bytes key);
  public static RawKeyQuery withKey(final byte[] key);
  public Bytes getKey();
}

Example query: RawScanQuery

This example demonstrates two variations on the first example:

  1. The ability to define queries handling "raw" binary data for keys and values
  2. The ability for a query to return multiple results (in this case, an iterator)

I'm only bundling those for brevity. We can also have typed, iterable queries and raw single-record queries.

Note this query is purely an example of what is possible. It will not be added as part of this KIP.

Code Block
public class RawScanQuery implements Query<KeyValueIterator<Bytes, byte[]>> {
    private RawScanQuery() { }

    public static RawScanQuery scan() {
        return new RawScanQuery();
    }
}

// example usage

// since we're handling raw data, we're going to need the serdes
// this is just an example, this method is also not proposed in this KIP.
StateQuerySerdes<Integer, ValueAndTimestamp<Integer>> serdes =
  kafkaStreams.serdesForStore("mystore");

// run the "scan" query
StateQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
  kafkaStreams.query(inStore("mystore").withQuery(RawScanQuery.scan()));

// This time, we'll get results from all locally available partitions.
Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =
  scanResult.getPartitionResults();


// for this example, we'll just collate all the partitions' iterators
// together and print their data

List<KeyValueIterator<Bytes, byte[]>> iterators =
  partitionResults
    .values()
    .stream()
    .map(QueryResult::getResult)
    .collect(Collectors.toList());

// Using an example convenience method we could add to collate iterators' data
try (CloseableIterator<KeyValue<Bytes, byte[]>> collated = Iterators.collate(collect)) {
  while(collate.hasNext()) {
    KeyValue<Bytes, byte[]> next = collated.next();
    System.out.println(
      "|||" +
        " " + serdes.keyFrom(next.key.get()) +
        " " + serdes.valueFrom(next.value)
    );
  }
}

QueryResult

This is a container for a single partition's query result.

defined for IQv2 callers to execute, which allows them to bypass the serialization logic in the Metered stores entirely, potentially saving valuable CPU time and memory.

Code Block
@Evolving
public class RawKeyQuery implements Query<byte[]> {
  public static RawKeyQuery withKey(final Bytes key);
  public static RawKeyQuery withKey(final byte[] key);
  public Bytes getKey();
}


Example query: RawScanQuery

This example demonstrates two variations on the first example:

  1. The ability to define queries handling "raw" binary data for keys and values
  2. The ability for a query to return multiple results (in this case, an iterator)

I'm only bundling those for brevity. We can also have typed, iterable queries and raw single-record queries.

Note this query is purely an example of what is possible. It will not be added as part of this KIP.

Code Block
public class RawScanQuery implements Query<KeyValueIterator<Bytes, byte[]>> {
    private RawScanQuery() { }

    public static RawScanQuery scan() {
        return new RawScanQuery();
    }
}

// example usage

// since we're handling raw data, we're going to need the serdes
// this is just an example, this method is also not proposed in this KIP.
StateQuerySerdes<Integer, ValueAndTimestamp<Integer>> serdes =
  kafkaStreams.serdesForStore("mystore");

// run the "scan" query
StateQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
  kafkaStreams.query(inStore("mystore").withQuery(RawScanQuery.scan()));

// This time, we'll get results from all locally available partitions.
Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =
  scanResult.getPartitionResults();


// for this example, we'll just collate all the partitions' iterators
// together and print their data

List<KeyValueIterator<Bytes, byte[]>> iterators =
  partitionResults
    .values()
    .stream()
    .map(QueryResult::getResult)
    .collect(Collectors.toList());

// Using an example convenience method we could add to collate iterators' data
try (CloseableIterator<KeyValue<Bytes, byte[]>> collated = Iterators.collate(collect)) {
  while(collate.hasNext()) {
    KeyValue<Bytes, byte[]> next = collated.next();
    System.out.println(
      "|||" +
        " " + serdes.keyFrom(next.key.get()) +
        " " + serdes.valueFrom(next.value)
    );
  }
}

QueryResult

This is a container for a single partition's query result.

Code Block
@Evolving
public class QueryResult<R> {
  // wraps a successful result
  public static <R> QueryResult<R> forResult(R result);

  // wraps a failure result
  public static <R> QueryResult<R> forFailure(FailureReason failureReason, String failureMessage); 

  // returns a failed query result because the store didn't know how to handle the query.
  public static <R> QueryResult<R> forUnknownQueryType(Query<R> query, StateStore store);

  // returns a failed query result because the partition wasn't caught up to the desired bound.
  public static <R> QueryResult<R> notUpToBound(Position currentPosition, Position bound);

  // Used by state stores that need to delegate to another store to run a query and then
  // translate the results. Does not change the execution info or any other metadata.
  public <NewR> QueryResult<NewR> swapResult(NewR newTypedResult);

  // If requested, stores should record
  // helpful information, such as their own class, how they executed the query,
  // and the time they took.
  public void addExecutionInfo(String executionInfo
Code Block
@Evolving
public class QueryResult<R> {
  // wraps a successful result
  public static <R> QueryResult<R> forResult(R result);

  // wraps a failure result
  public static <R> QueryResult<R> forFailure(FailureReason failureReason, String failureMessage); 

  // returns a failed query result because the store didn't know how to handle the query.
  public static <R> QueryResult<R> forUnknownQueryType(Query<R> query, StateStore store);

  // returnsrecords athe failedpoint queryin result because the partition wasnstore'ts caughthistory upthat toexecuted the desired boundquery.
  public staticpublic <R>void QueryResult<R> notUpToBoundsetPosition(Position currentPosition, Position bound position);

  public boolean isSuccess();
  public boolean isFailure();
  // Used by state stores that need to delegate to another store to run a query and then
  // translate the results. Does not change the execution info or any other metadata.
  public <NewR> QueryResult<NewR> swapResult(NewR newTypedResult);

  // If requested, stores should record
  // helpful information, such as their own class, how they executed the query,
  // and the time they took.
  public void addExecutionInfo(String executionInfo);

  // records the point in the store's history that executed the query.
  public void setPosition(Position position);

  public boolean isSuccess();
  public boolean isFailure();
  public List<String> getExecutionInfo();
  public Position getPosition();
  public FailureReason getFailureReason();
  public String getFailureMessage();
  public getResult();
}

FailureReason

An enum classifying failures for individual partitions' failures

 public List<String> getExecutionInfo();
  public Position getPosition();
  public FailureReason getFailureReason();
  public String getFailureMessage();
  public getResult();
}

FailureReason

An enum classifying failures for individual partitions' failures

Code Block
public enum FailureReason {
    /**
     * Failure indicating that the store doesn't know how to handle the given query.
     */
    UNKNOWN_QUERY_TYPE,

    /**
     * The query required to execute on an active task (via {@link StateQueryRequest#requireActive()}),
     * but while executing the query, the task was either a Standby task, or it was an Active task
     * not in the RUNNING state. The failure message will contain the reason for the failure.
     * <p>
Code Block
public enum FailureReason {
    /**
     * FailureThe indicatingcaller thatshould theeither storetry doesn'tagain knowlater howor totry handlea thedifferent given queryreplica.
     */
    UNKNOWNNOT_QUERY_TYPEACTIVE,

    /**
     * Failure indicating that the store partition is not (yet) up to the desired bound.
     * The caller should either try again later or try a different replica.
     */
    NOT_UP_TO_BOUND,

    /**
     * Failure indicating that the requested store partition is not present on the local
     * KafkaStreams instance. It may have been migrated to another instance during a rebalance.
     * The caller is recommended to try a different replica.
     */
    NOT_PRESENT,

    /**
     * The requested store partition does not exist at all. For example, partition 4 was requested,
     * but the store in question only has 4 partitions (0 through 3).
     */
    DOES_NOT_EXIST;
}

...

Code Block
@Evolving
public interface PositionBound {

  // Create a new Position bound representing only the latest state
  static PositionBound latest();

  // Create a new Position bound representing "no bound"
  static PositionBound unbounded();

  // Create a new, empty Position
  static PositionBound at(Position position);

  // Check whether this is a "latest" Position bound
  boolean isLatest();

  // Check whether this is an "unbounded" Position bound
  boolean isUnbounded();

  // Get the Position (if it's not unbounded or latest)
  Position position();
}

...