Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...


Step 1 in proposal: expose state store names to DSL and local queries

The stream store namespace is local to a KStreams instance, i.e., it is part of the same process that the KStreams instance is in. Conceptually the code to access such a store would look like this:

KafkaStreams streams = new KafkaStreams(..);

ReadOnlyKeyValueStore store = streams.getStorestore("storeName", QueryableStoreTypes.keyValueStore());


The state store is discovered by querying the KafkaStreams instance. The query operations will be read-only, i.e., no updates. The query method is calling methods of the StateStore object.


...

  • Collection<KafkaStreamsInstance> KafkaStreams.getAllInstancesallInstances() where KafkaStreamsInstance has fields such as list of assigned partitions, list of state store names and HostInfo that includes hostname / port, etc. The port is the listening port of a user-defined listener that users provide to listen for queries (e.g., using REST APIs). More on the user-defined agent below.
  • Collection<KafkaStreamsInstance> KafkaStreams.getAllInstancesWIthStoreallInstancesWIthStore(String /* storeName */) would return only the KafkaStreamsInstances that include the given store name.

  • KafkaStreamsInstance KafkaStreams.getInstanceWithKeyinstanceWithKey(String storeName, K key, Serializer<K> serializer) would return the KafkaStreamInstance that has the store that might have the key (the key might not exist in any store). The default StreamPartitioner will be used for key partitioning.

  • KafkaStreamsInstance KafkaStreams.getInstanceWithKeyinstanceWithKey(String storeName, K key, StreamPartitioner<K, ?> partitioner) same as above but will use the provided StreamPartitioner

...

Code Block
public interface QueryableStoreType<T> {
    /**
     * Called when searching for {@StateStore}s to see if they
     * match the type expected by implementors of this interface
     * @param stateStore    The stateStore
     * @return true if it is a match
     */
    boolean accepts(final StateStore stateStore);
    /**
     * Create an instance of T (usually a facade) that developers can use
     * to query the Underlying {@StateStore}s
     * @param storeProvider     provides access to all the underlying StateStore instances of type T
     * @param storeName         The name of the Store
     * @return  T usually a read-only interface over a StateStore @see {@link   QueryableStoreTypes.KeyValueStoreType}
     */
    T create(final UnderlyingStoreProvider<T>StateStoreProvider storeProvider, final String storeName);
}
public interface StateStoreProvider {
	/**
     * Provides access to Find instances of StateStore that are accepted by {@link org.apache.kafka.streams.processor.StateStore}s as
 * defined by {@link QueryableStoreType}
 * @param <T>
 */
public class UnderlyingStoreProvider<T> {
    /**
     * get all the StateStores with the give nameQueryableStoreType#accepts} and
     * have the provided storeName.
     *
     * @param storeName             name of the store
     * @param queryableStoreType    filter stores based on this queryableStoreType
     * @param <T>                   The type of the Store
     * @return  List of the instances of the store in this topology. Empty List if not found
     */
    public<T> List<T> getStores(final String storeName, QueryableStoreType<T> queryableStoreType);
}

A class that provides implementations of the QueryableStoreTypes that are part of KafkaStreams,i.e.,

...

Code Block
final ReadOnlyKeyValueStore<String, Long>
   myCount = kafkaStreams.getStorestore("my-count", QueryableStoreTypes.<String, Long>keyValueStore());
final ReadOnlyWindowStore<String, String>
   joinOther =
   kafkaStreams.getStorestore("join-other", QueryableStoreTypes.<String, String>windowStore());

...

Code Block
public class HostInfo {
   String hostname;    /* hostname for instance that contains state store */
   int port;           /* listening port for instance that contains state store */
}
public class KafkaStreamsInstance {
    private final HostInfo hostInfo; /* hostInfo from above */
    private final Set<String> stateStores; /* state stores on this instance */
    private final Set<TopicPartition> topicPartitions; /* TopicPartitions on this instance */
}

/**
 * Find all of the instances of {@link KafkaStreamsInstance} in a {@link KafkaStreams application}
 * Note: this is a point in time view and it may change due to partition reassignment.
 * @return collection containing all instances of {@link KafkaStreamsInstance} in this application
 */
public Collection<KafkaStreamsInstance> getAllInstancesallInstances();

/**
 * Find the instances {@link KafkaStreamsInstance} for a given storeName
 * Note: this is a point in time view and it may change due to partition reassignment.
 * @param storeName the storeName to find metadata for
 * @return  A collection containing instances of {@link KafkaStreamsInstance} that have the provided storeName
 */
public Collection<KafkaStreamsInstance> getAllInstancesWithStoreallInstancesWithStore(final String storeName);

/**
 * Find the {@link KafkaStreamsInstance} for a given storeName and key.
 * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
 * this method provides a way of finding which host it would exist on.
 * Note: this is a point in time view and it may change due to partition reassignment.
 * @param storeName         Name of the store
 * @param key               Key to use to for partition
 * @param keySerializer     Serializer for the key
 * @param <K>               key type
 * @return  The {@link KafkaStreamsInstance} for the storeName and key
 */
public <K> KafkaStreamsInstance getInstanceWithKeyinstanceWithKey(final String storeName, 
												   final K key, 
                                                   final Serializer<K> keySerializer);

/**
 * Find the {@link KafkaStreamsInstance} for a given storeName and key.
 * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
 * this method provides a way of finding which host it would exist on.
 * Note: this is a point in time view and it may change due to partition reassignment.
 * @param storeName         Name of the store
 * @param key               Key to use to for partition
 * @param partitioner       Partitioner for the store
 * @param <K>               key type
 * @return  The {@link KafkaStreamsInstance} for the storeName and key
 */
public <K> KafkaStreamsInstance getInstanceWithKeyinstanceWithKey(final String storeName,
                                                   final K key,
                                                   final StreamPartitioner<K, ?> partitioner);

...