Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • This addresses Problem 2 because responses aren't required to be composable
  • It also creates room for partitions to report successful or failure responses independently, which addresses Unintended Consequence 1b
  • It also addresses Unintended Consequence 1c because we don't do extra work to combine partions' results unless we need to.

Public Interfaces

This KIP will add ...:

...

languagejava

Special interface stability protocol

All of the new methods and classes will be marked as @Evolving  ("Compatibility may be broken at minor release") to allow for more rapid interaction as we flesh out the first generation of queries. For such changes, we will simply notify the KIP discussion thread and update this document. Our goal is to stabilize the interface as quickly as possible, ideally in the next major release.

Public Interfaces

This KIP will add several new methods, classes, and interfaces.

KafkaStreams modifications

IQv2 will continue to present its primary API via the KafkaStreams interface. The query  method itself is this API.

As the JavaDoc indicates, we also provide a mechanism to get a handle on the serdes, in case some queries want to handle binary data directly.

Code Block
languagejava
titleKafkaStreams
public class KafkaStreams implements AutoCloseable {
  ...

  /**
    * Run an interactive query against a state store.
    * <p>
    * This method allows callers outside of the Streams runtime to
    * access the internal state of stateful processors. See
    * https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
    * for more information.
    * /
  @Evolving
  public <R> InteractiveQueryResult<R> query(InteractiveQueryRequest<R> request);

  /**
    * Get a reference to the serdes used internally in a state store
    * for use with interactive queries. While many queries already
    * accept Java-typed keys and also return typed results, some
    * queries may need to handle the raw binary data stored in StateStores,
    * in which case, this method can provide the serdes needed to interpret
    * that data.
    * /
  @Evolving
  public <K, V> InteractiveQuerySerdes<K, V> serdesForStore(String storeName);

  ...
}

InteractiveQueryRequest

This is the main request object for IQv2. It contains all of the information required to execute the query. Note that, although this class is similar to the IQv1 request object StoreQueryParameters , we are proposing a new class to avoid unnecessary coupling between the old and new APIs' design goals.

This class implements a progressive builder pattern in an attempt to avoid the pitfalls we identified in Kafka Streams DSL Grammar. The progressive builder pattern allows us to supply arguments via semantically meaningful static and instance methods AND check required arguments at compile time.

The way it works is that the constructor is private and the first required argument is a static method. That method returns an intermediate interface that contains the next builder method, which either returns a new intermediate interface or the final class, depending on whether the following arguments are required or not. All of the optional arguments can be instance methods in the final interface.

Note: the "position bound" part of the proposal is an evolution on IQv1 and potentially controversial. The idea is to remove the need for the semantically loose StoreQueryParameters#enableStaleStores method. Instead of choosing between querying only the active store and querying unboundedly stale restoring and standby stores, callers can choose to query only the active store (latest), query any  store (no bound), or to query a store that is at least past the point of our last query/queries. If this idea is controversial, we can separate it from this KIP and propose it separately.

Code Block
/**
  * The request object for Interactive Queries.
  * This is an immutable builder class for passing all required and
  * optional arguments for querying a state store in Kafka Streams.
  * <p>
  * @param <R> The type of the query result.
  */
public class InteractiveQueryRequest<R> {

  /**
    * First required argument to specify the name of the store to query
    */
  public static InStore inStore(final String name);

  public static class InStore {

    /**
      * Second required argument to provide the query to execute.
      */
    public <R> InteractiveQueryRequest<R> withQuery(final Query<R> query);
  } 

  /**
    * Optionally bound the current position of the state store
    * with respect to the input topics that feed it. In conjunction
    * with {@link InteractiveQueryResult#getPosition}, this can be
    * used to achieve a good balance between consistency and
    * availability in which repeated queries are guaranteed to
    * advance in time while allowing reads to be served from any
    * replica that is caught up to that caller's prior observations.
    * <p>
    * Note that the set of offsets provided in the bound does not determine
    * the partitions to query. For that, see {@link withPartitionsToQuery}.
    * Unrelated offsets will be ignored, and missing offsets will be treated
    * as indicating "no bound". 
    */
  public InteractiveQueryRequest<R> withPositionBound(Offsets positionBound);

  /**
    * Optionally bound the query to only execute on active running tasks.
    * This is the freshest and most consistent possible result, but does
    * not provide high availability. This is the default if no bound is
    * specified.
    */
  public InteractiveQueryRequest<R> withLatestPositionBound(); 

  /**
    * Optionally specify that the query will run on any store,
    * whether active, standby, or restoring. This data may be
    * arbitrarily stale, but displays the best possible availability.
    */
  public InteractiveQueryRequest<R> withNoPositionBound();

  /**
    * Optionally specify the partitions to include in the query.
    * If omitted, the default is to query all locally available partitions
    */
  public InteractiveQueryRequest<R> withPartitionsToQuery(Set<Integer> partitions);
}

InteractiveQueryResult

This is the main response object for IQv2, ...

Compatibility, Deprecation, and Migration Plan

...