Table of Contents |
---|
Status
Current state: "Under DiscussionAccepted"
Discussion thread:
1. [DISCUSS] KIP-591: Add Kafka Streams config to set default store type
2. discuss: https://lists.apache.org/thread/238rstw9zj49vyhzzoh67d8b74vz4nbm
3. vote: https://lists.apache.org/thread/qz8s06t2brjcv928bo26qqtz06ysg3ln
JIRA: Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-9847
...
Kafka Streams supports RocksDB stores as well as in-memory stores out of the box. By default all DSL operators use RocksDB stores. Currently, it is only possible to switch out RocksDB stores with in-memory store on a per operator basis what is tedious if all stores should be switched (for example in a Kubernetes deployment without local disks).
Furthermore, the current store interface via Materialized
is very generic as it allows to pass in any custom state store implementation with the side effect that a user needs to name the store and thus also makes it queryable via "Interactive Queries".
...
Code Block |
---|
public class Materialized<K, V, S extends StateStore> { public enum StoreImplType { ROCKS_DB, IN_MEMORY } public static <K, V, S extends StateStore> Materialized<K, V, S> as(StoreImplType storeType); StoreImplType { ROCKS_DB, IN_MEMORY } /** * Materialize a {@link StateStore} with the given {@link StoreType}. * * @param storeType the type of the state store * @param <K> key type of the store * @param <V> value type of the store * @param <S> type of the {@link StateStore} * @return a new {@link Materialized} instance with the given storeName */ public static <K, V, S extends StateStore> Materialized<K, V, S> as(StoreImplType storeType); /** * Set the type of the materialized {@link StateStore}. * * @param storeType the store type {@link StoreType} to use. * @return itself */ public Materialized<K, V, S> withStoreType(StoreImplType storeType); } |
...
Code Block |
---|
KStream stream = ...
// current code to change from RocksDB to in-memory
stream.groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("some-name"));
// new code, without the need to provide a name
stream.groupByKey().count(Materialized.withStoreType(StoreImplType.IN_MEMORY)); |
...