Table of Contents |
---|
Status
Current state: "Under DiscussionAccepted"
Discussion thread:
1. [DISCUSS] KIP-591: Add Kafka Streams config to set default store type
2. discuss: https://lists.apache.org/thread/238rstw9zj49vyhzzoh67d8b74vz4nbm
3. vote: https://lists.apache.org/thread/qz8s06t2brjcv928bo26qqtz06ysg3ln
JIRA: Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-9847
...
Code Block |
---|
public class Materialized<K, V, S extends StateStore> {
public enum StoreImplType {
ROCKS_DB,
IN_MEMORY
}
/**
* Materialize a {@link StateStore} with the given {@link StoreType}.
*
* @param storeType the type of the state store
* @param <K> key type of the store
* @param <V> value type of the store
* @param <S> type of the {@link StateStore}
* @return a new {@link Materialized} instance with the given storeName
*/
public static <K, V, S extends StateStore> Materialized<K, V, S> as(StoreImplType storeType);
/**
* Set the type of the materialized {@link StateStore}.
*
* @param storeType the store type {@link StoreType} to use.
* @return itself
*/
public Materialized<K, V, S> withStoreType(StoreImplType storeType);
} |
We'll also add 3 static helper API in Stores
, to allow the implementation to be able to get the store supplier by the store type.
Code Block | ||
---|---|---|
| ||
/**
* Create a {@link SessionBytesStoreSupplier} by the provided {@code storeType}.
*
* @param storeType {@link org.apache.kafka.streams.kstream.Materialized.StoreType} of the store
* @param storeName name of the store (cannot be {@code null})
* @param retentionPeriod length ot time to retain data in the store (cannot be negative)
* (note that the retention period must be at least as long enough to
* contain the inactivity gap of the session and the entire grace period.)
* @return an instance of a {@link SessionBytesStoreSupplier}
*/
public static SessionBytesStoreSupplier sessionStoreSupplierByStoreType(final Materialized.StoreType storeType,
final String storeName,
final long retentionPeriod) {}
/**
* Create a persistent {@link WindowBytesStoreSupplier} by the provided {@code storeType}.
*
* @param storeType {@link org.apache.kafka.streams.kstream.Materialized.StoreType} of the store
* @param storeName name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* (note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period)
* @param windowSize size of the windows (cannot be negative)
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
* @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
*/
public static WindowBytesStoreSupplier windowStoreSupplierByStoreType(final Materialized.StoreType storeType,
final String storeName,
final long retentionPeriod,
final long windowSize) {}
/**
* Create a persistent {@link KeyValueBytesStoreSupplier} by the provided {@code storeType}.
*
* @param storeType {@link org.apache.kafka.streams.kstream.Materialized.StoreType} of the store
* @param storeName name of the store (cannot be {@code null})
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
* to build a persistent key-(timestamp/value) store
*/
public static KeyValueBytesStoreSupplier keyValueStoreSupplierByStoreType(final Materialized.StoreType storeType,
final String storeName) {} |
In order to leverage the new configuration, users will need to call the new proposed API in KAFKA-13281, to provide topology-config
while construct the StreamBuilder
. Currently, the internal implementation for topology-config
is all set. In this KIP, we'll publish the API to allow pass the topology-config
into StreamBuilder
.
...