...
Step 1 in proposal: expose state store names to DSL and local queries
The stream store namespace is local to a KStreams instance, i.e., it is part of the same process that the KStreams instance is in. Conceptually the code to access such a store would look like this:
KafkaStreams streams = new KafkaStreams(..);
ReadOnlyKeyValueStore store = streams.getStorestore("storeName", QueryableStoreTypes.keyValueStore());
The state store is discovered by querying the KafkaStreams instance. The query operations will be read-only, i.e., no updates. The query method is calling methods of the StateStore object.
...
- Collection<KafkaStreamsInstance> KafkaStreams.getAllInstancesallInstances() where KafkaStreamsInstance has fields such as list of assigned partitions, list of state store names and HostInfo that includes hostname / port, etc. The port is the listening port of a user-defined listener that users provide to listen for queries (e.g., using REST APIs). More on the user-defined agent below.
Collection<KafkaStreamsInstance> KafkaStreams.getAllInstancesWIthStoreallInstancesWIthStore(String /* storeName */) would return only the KafkaStreamsInstances that include the given store name.
KafkaStreamsInstance KafkaStreams.getInstanceWithKeyinstanceWithKey(String storeName, K key, Serializer<K> serializer) would return the KafkaStreamInstance that has the store that might have the key (the key might not exist in any store). The default StreamPartitioner will be used for key partitioning.
KafkaStreamsInstance KafkaStreams.getInstanceWithKeyinstanceWithKey(String storeName, K key, StreamPartitioner<K, ?> partitioner) same as above but will use the provided StreamPartitioner
...
Code Block |
---|
public interface QueryableStoreType<T> { /** * Called when searching for {@StateStore}s to see if they * match the type expected by implementors of this interface * @param stateStore The stateStore * @return true if it is a match */ boolean accepts(final StateStore stateStore); /** * Create an instance of T (usually a facade) that developers can use * to query the Underlying {@StateStore}s * @param storeProvider provides access to all the underlying StateStore instances of type T * @param storeName The name of the Store * @return T usually a read-only interface over a StateStore @see {@link QueryableStoreTypes.KeyValueStoreType} */ T create(final UnderlyingStoreProvider<T>StateStoreProvider storeProvider, final String storeName); } public interface StateStoreProvider { /** * Provides access to Find instances of StateStore that are accepted by {@link org.apache.kafka.streams.processor.StateStore}s as * defined by {@link QueryableStoreType} * @param <T> */ public class UnderlyingStoreProvider<T> { /** * get all the StateStores with the give nameQueryableStoreType#accepts} and * have the provided storeName. * * @param storeName name of the store * @param queryableStoreType filter stores based on this queryableStoreType * @param <T> The type of the Store * @return List of the instances of the store in this topology. Empty List if not found */ public<T> List<T> getStores(final String storeName, QueryableStoreType<T> queryableStoreType); } |
A class that provides implementations of the QueryableStoreTypes that are part of KafkaStreams,i.e.,
...
Code Block |
---|
final ReadOnlyKeyValueStore<String, Long> myCount = kafkaStreams.getStorestore("my-count", QueryableStoreTypes.<String, Long>keyValueStore()); final ReadOnlyWindowStore<String, String> joinOther = kafkaStreams.getStorestore("join-other", QueryableStoreTypes.<String, String>windowStore()); |
...
Code Block |
---|
public class HostInfo { String hostname; /* hostname for instance that contains state store */ int port; /* listening port for instance that contains state store */ } public class KafkaStreamsInstance { private final HostInfo hostInfo; /* hostInfo from above */ private final Set<String> stateStores; /* state stores on this instance */ private final Set<TopicPartition> topicPartitions; /* TopicPartitions on this instance */ } /** * Find all of the instances of {@link KafkaStreamsInstance} in a {@link KafkaStreams application} * Note: this is a point in time view and it may change due to partition reassignment. * @return collection containing all instances of {@link KafkaStreamsInstance} in this application */ public Collection<KafkaStreamsInstance> getAllInstancesallInstances(); /** * Find the instances {@link KafkaStreamsInstance} for a given storeName * Note: this is a point in time view and it may change due to partition reassignment. * @param storeName the storeName to find metadata for * @return A collection containing instances of {@link KafkaStreamsInstance} that have the provided storeName */ public Collection<KafkaStreamsInstance> getAllInstancesWithStoreallInstancesWithStore(final String storeName); /** * Find the {@link KafkaStreamsInstance} for a given storeName and key. * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore}, * this method provides a way of finding which host it would exist on. * Note: this is a point in time view and it may change due to partition reassignment. * @param storeName Name of the store * @param key Key to use to for partition * @param keySerializer Serializer for the key * @param <K> key type * @return The {@link KafkaStreamsInstance} for the storeName and key */ public <K> KafkaStreamsInstance getInstanceWithKeyinstanceWithKey(final String storeName, final K key, final Serializer<K> keySerializer); /** * Find the {@link KafkaStreamsInstance} for a given storeName and key. * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore}, * this method provides a way of finding which host it would exist on. * Note: this is a point in time view and it may change due to partition reassignment. * @param storeName Name of the store * @param key Key to use to for partition * @param partitioner Partitioner for the store * @param <K> key type * @return The {@link KafkaStreamsInstance} for the storeName and key */ public <K> KafkaStreamsInstance getInstanceWithKeyinstanceWithKey(final String storeName, final K key, final StreamPartitioner<K, ?> partitioner); |
...