Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleKafkaStreams
public class KafkaStreams implements AutoCloseable {
  ...

  /**
    * Run an interactive query against a state store.
    * <p>
    * This method allows callers outside of the Streams runtime to
    * access the internal state of stateful processors. See
    * https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
    * for more information.
    * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link KafkaStreams#start()}
 and then retry *this call.
    * @throws StreamsStoppedException If Streams is in a terminal state like PENDING_SHUTDOWN, NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should discover a new instance         and then retry this callto query.
     * @throws UnknownStateStoreException If the specified store name does not exist in the topology.
    * @throws InvalidStateStorePartitionException If the specified partition does not exist.
    * /
  @Evolving
  public <R> StateQueryResult<R> query(StateQueryRequest<R> request);

  ...
}

...

Code Block
@Evolving
public class QueryResult<R> {
  // wraps a successful result
  public static <R> QueryResult<R> forResult(R result);

  // returns a failed query result because the store didn't know how to handle the query.
  public static <R> QueryResult<R> forUnknownQueryType(Query<R> query, StateStore store);

  // returns a failed query result because the partition wasn't caught up to the desired bound.
  public static <R> QueryResult<R> notUpToBound(Position currentPosition, Position bound);

  // Used by state stores that need to delegate to another store to run a query and then
  // translate the results. Does not change the execution info or any other metadata.
  public <NewR> QueryResult<NewR> swapResult(NewR newTypedResult);

  // If requested, stores should record
  // helpful information, such as their own class, how they executed the query,
  // and the time they took.
  public void addExecutionInfo(String executionInfo);

  public boolean isSuccess();
  public boolean isFailure();
  public List<String> getExecutionInfo();
  public FailureReason getFailureReason();
  public String getFailure();
}

FailureReason

An enum classifying failures for individual partitions' failures

Code Block
public enum FailureReason {
    /**
     * Failure indicating that the store doesn't know how to handle the given query.
     */
    UNKNOWN_QUERY_TYPE,

    /**
     * Failure indicating that the store partition is not (yet) up to the desired bound.
     * The caller should either try again later or try a different replica.
     */
    NOT_UP_TO_BOUND,

    /**
     * Failure indicating that the requested store partition is not present on the local
     * KafkaStreams instance. It may have been migrated to another instance during a rebalance.
     * The caller is recommended to try a different replica.
     */
    NOT_PRESENT,

    /**
     * The requested store partition does not exist at all. For example, partition 4 was requested,
     * but the store in question only has 4 partitions (0 through 3).
     */
    DOES_NOT_EXIST;
}


Position

A class representing a processing state position in terms of its inputs: a vector or (topic, partition, offset) components.

...