Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  Under Discussion Accepted

Discussion thread: here 

JIRA: here [Change the link from KAFKA-1 to your own ticket]3909 

Released: 0.10.1.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

4    .countByKey("StoreName")

            5 wordCounts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");

...


Step 1 in proposal: expose state store names to DSL and local queries

The stream store namespace is local to a KStreams instance, i.e., it is part of the same process that the KStreams instance is in. Conceptually the code to access such a store would look like this:

KafkaStreams streams = new KafkaStreams(..);

ReadOnlyKeyValueStore store = streams.getStorestore("storeName", QueryableStoreTypes.keyValueStore());


The state store is discovered by querying the KafkaStreams instance. The query operations will be read-only, i.e., no updates. The query method is calling methods of the StateStore object.


...

Hence, the discovery API is part of the KafkaStreams instance. The API will provide three four methods:

    Map<HostState, Set<TaskMetadata>>
  • Collection<StreamsMetadata> KafkaStreams.
  • getAllTasks
  • allMetadata()) where
  • TaskMetadata
  • StreamsMetadata has fields such as list of assigned partitions, list of state store names
  • , etc; and HostState can include host metadata (byte array),
  • and HostInfo that includes hostname / port, etc. The port is the listening port of a user-defined listener that users provide to listen for queries (e.g., using REST APIs). More on the user-defined agent below.
  • Map<HostState, Set<TaskMetadata>> KafkaStreams.getAllTasks

    Collection<StreamsMetadata> KafkaStreams.allMetadataForStore(String /* storeName */) would return only the

    hosts and their assigned tasks if at least one of the tasks

    StreamsMetadata that include the given store name.

    Map<HostState, Set<TaskMetadata>>
  • StreamsMetadata KafkaStreams.

    getTask

    metadataWithKey(

    Key k,

    String

    /* storeName */

    storeName, K key, Serializer<K> serializer) would return

    only the host and their assigned task if the store with the store name has a particular key.

The individual discovery instances will know about all mappings between state store names, keys and KafkaStream instances. We propose that they keep track of this information by piggybacking on the consumer group membership protocol. In particular, we propose to add the above mapping Map<HostState, Set<TaskMetadata>> to StreamPartitionAssignor so that each consumer knows about all the other tasks metadata and host states in the system.

  • the StreamsMetadata that has the store that might have the key (the key might not exist in any store). The default StreamPartitioner will be used for key partitioning.

  • StreamsMetadata KafkaStreams.metadataForKey(String storeName, K key, StreamPartitioner<K, ?> partitioner) same as above but will use the provided StreamPartitioner

    An additional configuration parameter, StreamsConfig.APPLICATION_SERVER_CONFIG, will be added. This is a host:port pair supplied by the streams developer and should map to a Server running in the same instance of the KafkaStreams application. The supplied host:port pair will form part of the StreamsMetadata returned from the the above mentioned API calls.

The individual KafkaStreams instances will know about all mappings between state store names, keys and KafkaStream instances. We propose that they keep track of this information by piggybacking on the consumer group membership protocol. In particular, we propose to add the above mapping Map<HostInfo, Set<TopicPartition>> to StreamPartitionAssignor so that each consumer knows about all the other tasks metadata and host states in the system.


Bootstrapping. To bootstrap the discovery, a user can simply query the discovery instance of any one of the KafkaStream instances she operates, i.e., Bootstrapping. To bootstrap the discovery, a user can simply query the discovery instance of any one of the KafkaStream instances she operates, i.e., bootstrap happens within a single KafkaStream instance.

...

We propose adding an additional method to the  KafkaStreams public API:

Code Block
public <T> T getStore(final String storeName, 
                      final QueryableStoreType<T> queryableStoreType)
/**
 * Find the store with the provided storeName and of the type as
 * specified by QueryableStoreType. Will throw an UnknownStoreException
 * if a store with the given name and type is not found
 */
public <T> T store(final String storeName, 
                      final QueryableStoreType<T> queryableStoreType)

The QueryableStoreType interface, below, The QueryableStoreType interface, below, can be used to ‘plug-in’ different StateStore implementations to Queryable State. Developers using the Processor API and supplying custom StateStore’s can get access to their StateStores with the same API method as above. Implementations of this interface for the StateStores that are part of the KafkaStreams library will be provided by this KIP.

Code Block
public interface QueryableStoreType<T> {
    /**
     * Called when searching for {@StateStore}s to see if they
     * match the type expected by implementors of this interface
     * @param stateStore    The stateStore
     * @return true if it is a match
     */
    boolean accepts(final StateStore stateStore);
    /**
     * Create an instance of T (usually a facade) that developers can use
     * to query the Underlying {@StateStore}s
     * @param storeProvider     provides access to all the underlying StateStore instances of type T
     * @param storeName         The name of the Store
     * @return  T usually a read-only interface over a StateStore @see {@link   QueryableStoreTypes.KeyValueStoreType}
     */
    T create(final UnderlyingStoreProvider<T>StateStoreProvider storeProvider, final String storeName);
}
/**
 * Provides access to {@link org.apache.kafka.streams.processor.StateStore}s as
 * defined by {@link QueryableStoreType}
 * @param <T>
 */
public class UnderlyingStoreProvider<T>

A class that provides implementations of the QueryableStoreTypes that are part of KafkaStreams,i.e.,

Two new interfaces to restrict StateStore access to Read Only (note this only applies to implementations that are part of Kafka Streams)

Code Block
/* A window store that only supports read operations
*
* @param <K> Type of keys
* @param <V> Type of values
*/
public interface ReadOnlyWindowStore<K, V> {
    /**
     * getGet all the key-value StateStorespairs with the given givekey name
and the time range from */all
    public* List<T>the getStores(final String storeName);
}

A class that provides implementations of the QueryableStoreTypes that are part of KafkaStreams,i.e.,

Code Block
public class QueryableStoreTypes {existing windows.
    *
   public static* <K,@return V>an QueryableStoreType<ReadOnlyKeyValueStore<K, V>> keyValueStore() {
  iterator over key-value pairs {@code <timestamp, value>}
     return new KeyValueStoreType<>();*/
   }
WindowStoreIterator<V>   public static <Kfetch(K key, V>long QueryableStoreType<ReadOnlyWindowStore<KtimeFrom, V>>long windowStore() {
       return new WindowStoreType<>();
   }
   public static abstract class QueryableStoreTypeMatcher<T> implements QueryableStoreType<T> {timeTo);
}
/**
* A key value store that only supports read operations
* @param <K> the key type
* @param <V> the value type
*/
public interface ReadOnlyKeyValueStore<K, V> {
   /**
    * Get the value corresponding to this key
    *
    * @param key privateThe finalkey Classto matchTo;
   fetch
    * @return The value or null if no value is found.
    * @throws NullPointerException If null is used for key.
    */
   V get(K key);
  /**
    * Get an iterator over a given range of keys. 
    * This iterator MUST be closed after use.
    *
    * @param from The first key that could be in the range
    * @param to The last key that could be in the range
    * @return The iterator for this range.
    * @throws NullPointerException If null is used for from or to.
    */
   KeyValueIterator<K, V> range(K from, K to);
   /**
    * Return an iterator over all keys in the database. 
    * This iterator MUST be closed after use.
    *
    * @return An iterator of all key/value pairs in the store.
    */
   KeyValueIterator<K, V> all();
}

 

We can then use the above API to get access to the stores like so:

Code Block
final ReadOnlyKeyValueStore<String, Long>
   myCount = kafkaStreams.store("my-count", QueryableStoreTypes.<String, Long>keyValueStore());
final ReadOnlyWindowStore<String, String>
   joinOther =
   kafkaStreams.store("join-other", QueryableStoreTypes.<String, String>windowStore());

Discovery API

 Exposed APIs from Kafka Streams:

Code Block
/**
 * A new config will be added to StreamsConfig
 * A user defined endpoint that can be used to connect to remote KafkaStreams instances. 
 * Should be in the format host:port
 */
public static final String APPLICATION_SERVER_CONFIG = "application.server";


 
public class HostInfo {
   String hostname;    /* hostname for instance that contains state store */
   int port;           /* listening port for instance that contains state store */
}
public class StreamsMetadata {
    private final HostInfo hostInfo; /* hostInfo from above */
    private final Set<String> stateStores; /* state stores on this instance */
    private final Set<TopicPartition> topicPartitions; /* TopicPartitions on this instance */
}

/**
 * Find all of the instances of {@link StreamsMetadata} in a {@link KafkaStreams application}
 * Note: this is a point in time view and it may change due to partition reassignment.
 * @return collection containing all instances of {@link StreamsMetadata} in this application
 */
public Collection<StreamsMetadata> allMetadata();

/**
 * Find the instances {@link StreamsMetadata} for a given storeName
 * Note: this is a point in time view and it may change due to partition reassignment.
 * @param storeName the storeName to find metadata for
 * @return  A collection containing instances of {@link StreamsMetadata} that have the provided storeName
 */
public Collection<StreamsMetadata> allMetadataForStore(final String storeName);

/**
 * Find the {@link StreamsMetadata} for a given storeName and key.
 * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
 * this method provides a way of finding which host it would exist on.
 * Note: this is a point in time view and it may change due to partition reassignment.
 * @param storeName         Name of the store
 * @param key               Key to use to for partition
 * @param keySerializer     Serializer for the key
 * @param <K>               key type
 * @return  The {@link StreamsMetadata} for the storeName and key
 */
public <K> StreamsMetadata metadataForKey(final String storeName, 
												   final K key, 
                                                   final Serializer<K> keySerializer);

/**
 * Find the {@link StreamsMetadata} for a given storeName and key.
 * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
 * this method provides a way of finding which host it would exist on.
 * Note: this is a point in time view and it may change due to partition reassignment.
 * @param storeName         Name of the store
 * @param key               Key to use to for partition
 * @param partitioner       Partitioner for the store
 * @param <K>               key type
 * @return  The {@link StreamsMetadata} for the storeName and key
 */
public <K> StreamsMetadata metadataForKey(final String storeName,
                                                   final K key,
                                                   final StreamPartitioner<K, ?> partitioner);
Below is an example of how a developer might use the Discover API
Code Block
public static void main(String[] args) throws Exception {
  Properties streamsConfiguration = new Properties();
  ...
  /**
   * To use the Discovery API the developer must provide an host:port pair that
   * maps to an embedded service listening on this address. i.e.,
   * it is up to the developer to define the protocol and create the service
   */
  streamsConfiguration.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:7070");
  ...
  KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
  streams.start();

  /** 
   * Start your embedded service listening on the endpoint
   * provided above
   */
  new QueryableStateProxy(streams).start(7070);
}
 
/**
 * Example Rest Proxy that uses the Discovery API to locate the
 * KafkaStreamsInstances StateStores are running on. A developer would first connect 
 * to a well-known instance to find where a particular store, or store with key,
 * is located. They'd then use the returned KafkaStreamsInstances
 * to connect to the appropriate instances and perform queries, i.e, :
 * KafkaStreamsInstance instance = http.get("http://well-known-host:8080/state/instance/my-store/my-key");
 * Long result = http.get("http://" + instance.host() + ":" + instance.port() + "/state/stores/my-store/my-key");
 */
@Path("state")
public class QueryableStateProxy {
   /**
    * The KafkaStreams instance knows about all of the other instances 
    * in the application. This is maintained by the StreamPartitionAssignor
    * on partition assignments (rebalances) 
 */ 
   private final KafkaStreams streams;
   public QueryableStateProxy(final KafkaStreams streams) {
      this.streams = streams;
   }
   
   @GET()
   @Path("/instances")
   public Response streamsMetadata() {
	 // get the current collection of StreamsMetadata
     final Collection<StreamsMetadata> metadata = streams.allMetadata();
     return respondWithMetadata(metadata);
   }
 
   @GET()
   @Path("/instances/{storeName}")
   public Response streamsMetadataForStore(@PathParam("storeName") String store) {
      // find all the metadata that have the provided store
      final Collection<StreamsMetadata> metadata = streams.allMetadataForStore(store);
      return respondWithMetadata(metadata);
   }
 
   @GET()
   @Path("/instance/{storeName}/{key}")
   public Response streamsMetadataForStoreAndKey(@PathParam("storeName") String store,
                                                  @PathParam("key") String key) {
 
	  // locate the instance that would have the store with the provided key (if the key exists)
      final StreamsMetadata metadata = streams.metadataForKey(store, key, new StringSerializer());
      if (instance == null) {
        return Response.noContent().build();
      }
      return Response.ok(metadata.toString()).build();
   }
 
   @GET()
   @Path("/stores/{storeName}/{key}")
   public Response byKey(@PathParam("storeName") String storeName, @PathParam("id") String key) {
	  // Get a handle on the Store for the provides storeName
      final ReadOnlyKeyValueStore<String, Long> store = streams.store(storeName,
                                                                         QueryableStoreTypes.keyValueStore());
      // store may not exist or might not exist yet, i.e, if partitions haven't been assigned or 
      // a rebalance is in process
      if (store == null) {
        return Response.noContent().build();
      }

	  // we have a store so we can get the result
      final Long result = store.get(key);
      if (result == null) {
        return Response.noContent().build();
      }
      return Response.ok(result).build();
    }
 
	public void start(int port) {
     // start your favourite http server
     ...
    }
}
 

 

Developer Guide for Custom Stores

If you want to make a customized store Queryable you'll need to implement the QueryableStoreType interface. Below are example implementations for KeyValueStore and WindowStore. You may also want to provide an interface to restrict operations to read-only and a Composite type for providing a faćade over the potentially many instances of the underlying store (see example CompositeReadOnlyKeyValueStore below).

 

Code Block
public class QueryableStoreTypes {
   public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, V>> keyValueStore() {    public QueryableStoreTypeMatcher(Class matchTo) {
           this.matchTo = matchTo;
       }
return       @SuppressWarnings("unchecked")new KeyValueStoreType<>();
       @Override}
   public static <K, V> publicQueryableStoreType<ReadOnlyWindowStore<K, booleanV>> acceptswindowStore(final StateStore stateStore) {
   {
       return return matchTo.isAssignableFrom(stateStore.getClassnew WindowStoreType<>());
   }
   public static abstract class }
QueryableStoreTypeMatcher<T> implements QueryableStoreType<T> }{
   private static class KeyValueStoreType<K, V>private extends
final Class matchTo;
       public QueryableStoreTypeMatcher(Class matchTo) {
           this.matchTo = matchTo;
       }
       @SuppressWarnings("unchecked")
       @Override
  QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {
   public boolean accepts(final StateStore KeyValueStoreType(stateStore) {
           return supermatchTo.isAssignableFrom(ReadOnlyKeyValueStorestateStore.classgetClass());
       }
       @Override}
   private static class  public ReadOnlyKeyValueStore<KKeyValueStoreType<K, V> create(
           final UnderlyingStoreProvider<ReadOnlyKeyValueStore<K, V>> storeProvider,
extends
            final String storeName) {
           return new CompositeReadOnlyStore<>(storeProvider, storeName);
       }
   }
   private static class WindowStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<KQueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {
       WindowStoreTypeKeyValueStoreType() {
           super(ReadOnlyWindowStoreReadOnlyKeyValueStore.class);
       }
       @Override
       public ReadOnlyWindowStore<KReadOnlyKeyValueStore<K, V> create(
           final UnderlyingStoreProvider<ReadOnlyWindowStore<KUnderlyingStoreProvider<ReadOnlyKeyValueStore<K, V>> storeProvider,
           final String storeName) {
           return new CompositeReadOnlyStore<>(storeProvider, storeName);
       }
   }
   private static class WindowStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K,V>> {
     final String storeNameWindowStoreType() {
           return new CompositeReadOnlyWindowStore<>(storeProvider, storeNamesuper(ReadOnlyWindowStore.class);
       }
   }
}

Two new interfaces to restrict StateStore access to Read Only (note this only applies to implementations that are part of Kafka Streams)

Code Block
/* A window store that only supports read operations
*
* @param <K> Type of keys@Override
*   @param <V> Type of values
*/
public interface ReadOnlyWindowStore<K, V> {create(
   /**
    * Get all the key-value pairs with the given key and the time range from all
final UnderlyingStoreProvider<ReadOnlyWindowStore<K, V>> storeProvider,
           *final theString existingstoreName) windows.{
    *
    * @return an iteratorreturn over key-value pairs {@code <timestamp, value>}new CompositeReadOnlyWindowStore<>(storeProvider, storeName);
    */
   WindowStoreIterator<V>}
 fetch(K key, long timeFrom, long timeTo);
}
 }
}
 
public interface StateStoreProvider {
	/**
     * AFind keyinstances valueof storeStateStore that are accepted onlyby supports{@link readQueryableStoreType#accepts} operationsand
     * @param <K>have the keyprovided typestoreName.
* @param <V> the value type
*/
public  interface ReadOnlyKeyValueStore<K, V> {
* @param storeName /**
    * Get the value corresponding to this key
 name of the *store
     * @param keyqueryableStoreType The key to fetch
filter stores based on *this @returnqueryableStoreType
 The value or null if* no value is found.
@param <T>    * @throws NullPointerException If null is used for key.
    */
   VThe get(K key);
  /**
type of the Store
     * @return  List Getof anthe iteratorinstances overof athe givenstore rangein ofthis keys. topology. Empty List if not found
     */
 This   iterator<T> MUSTList<T> be closed after use.
    *
    * @param from The first key that could be in the range
    * @param to The last key that could be in the range
    * @return The iterator for this range.
    * @throws NullPointerException If null is used for from or to.
    */
   KeyValueIterator<K, V> range(K from, K to);
   /**
    * Return an iterator over all keys in the database. 
    * This iterator MUST be closed after use.
    *
    * @return An iterator of all key/value pairs in the store.
    */
   KeyValueIterator<K, V> all();
}

We can then use the above API to get access to the stores like so:

Code Block
final ReadOnlyKeyValueStore<String, Long>
   myCount = kafkaStreams.getStore("my-count", QueryableStoreTypes.<String, Long>keyValueStore());
final ReadOnlyWindowStore<String, String>
   joinOther =
   kafkaStreams.getStore("join-other", QueryableStoreTypes.<String, String>windowStore());

Discovery API

 Exposed APIs from Kafka Streams:

...

stores(String storeName, QueryableStoreType<T> queryableStoreType);
}
 
/**
 * A wrapper over the underlying {@link ReadOnlyKeyValueStore}s found in a {@link
 * org.apache.kafka.streams.processor.internals.ProcessorTopology}
 *
 * @param <K> key type
 * @param <V> value type
 */
public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueStore<K, V> {

    private final StateStoreProvider storeProvider;
    private final QueryableStoreType<ReadOnlyKeyValueStore<K, V>> storeType;
    private final String storeName;

    public CompositeReadOnlyKeyValueStore(final StateStoreProvider storeProvider,
                                          final QueryableStoreType<ReadOnlyKeyValueStore<K, V>> storeType,
                                          final String storeName) {
        this.storeProvider = storeProvider;
        this.storeType = storeType;
        this.storeName = storeName;
    }

    @Override
    public V get(final K key) {
        final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.getStores(storeName, storeType);
        for (ReadOnlyKeyValueStore<K, V> store : stores) {
            V result = store.get(key);
            if (result != null) {
                return result;
            }
        }
        return null;
    }
 
   //...
}

 

 

 

Proposed implementation outline

...

  • The query and discovery APIs will not affect existing users.
  • However, exposing state store names to the API will affect existing users, since the current interfaces will change. The new interfaces will not be backwards compatible.
  • As we now need to handle concurrent access to the State Stores, this may incur some overhead on streams applications while querying is on-going. We will measure the overhead in benchmarks.

Rejected Alternatives

Querying directly from Kafka topic. We considered allowing users to query directly from a Kafka topic, instead of a state store. The namespace would be global in this case, but instead of worrying about the StateStore namespace, we would be interested in the topic names instead (each state store is often backed into a Kafka topic). A user would create a RocksDb StateStore and instruct it to cache data from the desired topic. From then on, queries are performed on that local state store using the query method of direct calls, just like the above KIP proposal. Discovery with this option is implicitly done by the consumer that reads from the topic.

...