Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleKafkaStreams
public class KafkaStreams implements AutoCloseable {
  ...

  /**
    * Run an interactive query against a state store.
    * <p>
    * This method allows callers outside of the Streams runtime to
    * access the internal state of stateful processors. See
    * https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html
    * for more information.
    * /
  @Evolving
  public <R> InteractiveQueryResult<R> query(InteractiveQueryRequest<R> request);

  /**
    * Get a reference to the serdes used internally in a state store
    * for use with interactive queries. While many queries already
    * accept Java-typed keys and also return typed results, some
    * queries may need to handle the raw binary data stored in StateStores,
    * in which case, this method can provide the serdes needed to interpret
    * that data.
    * /
  @Evolving
  public <K, V> InteractiveQuerySerdes<K, V> serdesForStore(String storeName);

  ...
}

InteractiveQueryRequest

This is the main request object for IQv2. It contains all of the information required to execute the query. Note that, although this class is similar to the IQv1 request object StoreQueryParameters , we are proposing a new class to avoid unnecessary coupling between the old and new APIs' design goals.

This class implements a progressive builder pattern in an attempt to avoid the pitfalls we identified in Kafka Streams DSL Grammar. The progressive builder pattern allows us to supply arguments via semantically meaningful static and instance methods AND check required arguments at compile time.

The way it works is that the constructor is private and the first required argument is a static method. That method returns an intermediate interface that contains the next builder method, which either returns a new intermediate interface or the final class, depending on whether the following arguments are required or not. All of the optional arguments can be instance methods in the final interface.

Note: the "position bound" part of the proposal is an evolution on IQv1 and potentially controversial. The idea is to remove the need for the semantically loose StoreQueryParameters#enableStaleStores method. Instead of choosing between querying only the active store and querying unboundedly stale restoring and standby stores, callers can choose to query only the active store (latest), query any  store (no bound), or to query a store that is at least past the point of our last query/queries. If this idea is controversial, we can separate it from this KIP and propose it separately.

...
}

InteractiveQueryRequest

This is the main request object for IQv2. It contains all of the information required to execute the query. Note that, although this class is similar to the IQv1 request object StoreQueryParameters , we are proposing a new class to avoid unnecessary coupling between the old and new APIs' design goals.

This class implements a progressive builder pattern in an attempt to avoid the pitfalls we identified in Kafka Streams DSL Grammar. The progressive builder pattern allows us to supply arguments via semantically meaningful static and instance methods AND check required arguments at compile time.

The way it works is that the constructor is private and the first required argument is a static method. That method returns an intermediate interface that contains the next builder method, which either returns a new intermediate interface or the final class, depending on whether the following arguments are required or not. All of the optional arguments can be instance methods in the final interface.

Note: the "position bound" part of the proposal is an evolution on IQv1 and potentially controversial. The idea is to remove the need for the semantically loose StoreQueryParameters#enableStaleStores method. Instead of choosing between querying only the active store and querying unboundedly stale restoring and standby stores, callers can choose to query only the active store (latest), query any  store (no bound), or to query a store that is at least past the point of our last query/queries. If this idea is controversial, we can separate it from this KIP and propose it separately.

Code Block
/**
  * The request object for Interactive Queries.
  * This is an immutable builder class for passing all required and
  * optional arguments for querying a state store in Kafka Streams.
  * <p>
  * 
Code Block
/**
  * The request object for Interactive Queries.
  * This is an immutable builder class for passing all required and
  * optional arguments for querying a state store in Kafka Streams.
  * <p>
  * @param <R> The type of the query result.
  */
@Evolving
public class InteractiveQueryRequest<R> {

  /**
    * First required argument to specify the name of the store to query
    */
  public static InStore inStore(final String name);

  public static class InStore {

    /**
      * Second required argument to provide the query to execute.
      */
    public <R> InteractiveQueryRequest<R> withQuery(final Query<R> query);
  } 

  /**
    * Optionally bound the current position of the state store
    * with respect to the input topics that feed it. In conjunction
    * with {@link InteractiveQueryResult#getPosition}, this can be
    * used to achieve a good balance between consistency and
    * availability in which repeated queries are guaranteed to
    * advance in time while allowing reads to be served from any
    * replica that is caught up to that caller's prior observations.
    * <p>
    * Note that the set of offsets provided in the bound does not determine
    * the partitions to query. For that, see {@link withPartitionsToQuery}.
    * Unrelated offsets will be ignored, and missing offsets will be treated
    * as indicating "no bound". 
    */
  public InteractiveQueryRequest<R> withPositionBound(PositionBound positionBound);

  /**
    * Optionally specify the partitions to include in the query.
    * If omitted, the default is to query all locally available partitions
    */
  public InteractiveQueryRequest<R> withPartitions(Set<Integer> partitions);

  /**
    * Query all locally available partitions
    */
  public InteractiveQueryRequest<R> withAllPartitions(); 


  /**
    * Instruct Streams to collect detailed information during query
    * execution, for example, which stores handled the query, how
    * long they took, etc.
    */
  public InteractiveQueryRequest<R> enableExecutionInfo();

  // Getters are also proposed to retrieve the request parameters

  public String getStoreName();
  public Query<R> getQuery();
  public PositionBound getPositionBound();
  public boolean executionInfoEnabled() 

  /**
    * empty set indicates that no partitions will be fetched
    * non-empty set indicate the specific partitions that will be fetched (if locally available)
    * throws UnsupportedOperationException if the request is to all partitions (isAllPartitions() == true)
    */
  public Set<Integer> getPartitions();

  /**
    * indicates that all locally available partitions will be fetched
    */
  public boolean isAllPartitions();

}

...

A related problem (and the reason we really need to change "a large number" of store implementations in the prior paragraph) is the "wrapped" state store design. The visitor pattern only works when you're invoking the Visitor interface that has a defined method for your concrete type. But in Streams, when you start to run a query, you're not querying (eg) a RocksDBStore. What you are really querying is probably a MeteredKeyValueStore wrapping a CachingKeyValueStore wrapping a ChangeLoggingKeyValueStore wrapping a RocksDBStore. So we'd wind either making the KeyValueStore interface to be our Visitor, and every KeyValueStore implementation would have to define handlers for all queries OR we'd automatically bypass all intervening layers and only run queries directly against the bottom-layer store. The first puts us in exactly the position we're in today: user-defined queries can't take advantage of special properties of user-defined stores, and the second means that we can't ever serve queries out of the cache.defined method for your concrete type. But in Streams, when you start to run a query, you're not querying (eg) a RocksDBStore. What you are really querying is probably a MeteredKeyValueStore wrapping a CachingKeyValueStore wrapping a ChangeLoggingKeyValueStore wrapping a RocksDBStore. So we'd wind either making the KeyValueStore interface to be our Visitor, and every KeyValueStore implementation would have to define handlers for all queries OR we'd automatically bypass all intervening layers and only run queries directly against the bottom-layer store. The first puts us in exactly the position we're in today: user-defined queries can't take advantage of special properties of user-defined stores, and the second means that we can't ever serve queries out of the cache.

It is possible that some kind of hybrid model here might make dispatch more efficient, but it seems like it comes at the cost of substantial complexity. As with the prior idea, we will consider this further if the performance of the current proposal seems to be too low.

Provide method to get store serdes

We previously proposed to add the following method to the KafkaStreams interface:

Code Block
 
  /**
    * Get a reference to the serdes used internally in a state store
    * for use with interactive queries. While many queries already
    * accept Java-typed keys and also return typed results, some
    * queries may need to handle the raw binary data stored in StateStores,
    * in which case, this method can provide the serdes needed to interpret
    * that data.
    * /
  @Evolving
  public <K, V> InteractiveQuerySerdes<K, V> serdesForStore(String storeName); 

This has been removed from the proposal because it is not necessary to implement the "parity" queries that we would implement today (to achieve parity with existing Interactive Query capabilities.

It may be necessary in the future to support high-performance "raw" queries that don't de/serialize query inputs or responses, but once we have a specific proposal for those queries, we may find that another solution is more suitable (such as returning types that wrap their serdes and can be used either way). Regardless, we can defer this decision to a later KIP and simplify this proposalIt is possible that some kind of hybrid model here might make dispatch more efficient, but it seems like it comes at the cost of substantial complexity. As with the prior idea, we will consider this further if the performance of the current proposal seems to be too low.