Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: clarify exceptions and failure modes

...

IQv2 will continue to present its primary API via the KafkaStreams interface. The query  method itself is this API.

As the JavaDoc indicates, we also provide a mechanism to get a handle on the serdes, in case some queries want to handle binary data directly.

Note that exceptional conditions that prevent the query from being executed at all will be thrown as exceptions. These include Streams not having been started, and invalid requests like querying a store that doesn't exist or specifying a partition to query that doesn't exist.

There is a second class of failure, which today also is thrown as an exception, that will instead be reported on a per-partition basis as a QueryResult for which isFailure  is true, and which contains the reason for the failure along with other information. This would include conditions for which only part of the query may fail, such as a store being offline, a partition not being locally assigned, a store not being up to the desired bound, etc.

Code Block
languagejava
titleKafkaStreams
public class KafkaStreams implements AutoCloseable {
  ...

  /**
    * Run an interactive query against a state store.
    * <p>
    * This method allows callers outside of the Streams runtime to
    * access the internal state of stateful processors. See
    * https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
    * for more information.
    * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link KafkaStreams#start()}
    *                                    and then retry this call.
    * @throws UnknownStateStoreException If the specified store name does not exist in the topology.
    * @throws InvalidStateStorePartitionException If the specified partition does not exist
Code Block
languagejava
titleKafkaStreams
public class KafkaStreams implements AutoCloseable {
  ...

  /**
    * Run an interactive query against a state store.
    * <p>
    * This method allows callers outside of the Streams runtime to
    * access the internal state of stateful processors. See
    * https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
    * for more information.
    * /
  @Evolving
  public <R> InteractiveQueryResult<R> query(InteractiveQueryRequest<R> request);

  ...
}

...

Code Block
@Evolving
public class QueryResult<R> {
  // wraps a successful result
  public static <R> QueryResult<R> forResult(R result);

  // returns a failed query result because the store didn't know how to handle the query.
  public static <R> QueryResult<R> forUnknownQueryType(Query<R> query, StateStore store);

  // returns a failed query result because the partition wasn't caught up to the desired bound.
  public static <R> QueryResult<R> notUpToBound(Position currentPosition, Position bound);

  // returns a failed query result because caller requested a "latest" bound, but the task was
  // not active and running.
  public static <R> QueryResult<R> notActive(String currentState);

  // Used by state stores that need to delegate to another store to run a query and then
  // translate the results. Does not change the execution info or any other metadata.
  public <NewR> QueryResult<NewR> swapResult(NewR newTypedResult);

  // If requested, stores should record
  // helpful information, such as their own class, how they executed the query,
  // and the time they took.
  public void addExecutionInfo(String executionInfo);

  public boolean isSuccess();
  public boolean isFailure();
  public List<String> getExecutionInfo();

...  public FailureReason getFailureReason();
  public String getFailure();
}

Position

A class representing a processing state position in terms of its inputs: a vector or (topic, partition, offset) components.

...