Table of Contents |
---|
Status
Current state: "Under DiscussionAccepted"
Discussion thread:
1. [DISCUSS] KIP-591: Add Kafka Streams config to set default store type
2. discuss: https://lists.apache.org/thread/238rstw9zj49vyhzzoh67d8b74vz4nbm
3. vote: https://lists.apache.org/thread/qz8s06t2brjcv928bo26qqtz06ysg3ln
JIRA: Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-9847
...
Code Block |
---|
public class Materialized<K, V, S extends StateStore> {
public enum StoreImplType {
ROCKS_DB,
IN_MEMORY
}
/**
* Materialize a {@link StateStore} with the given {@link StoreType}.
*
* @param storeType the type of the state store
* @param <K> key type of the store
* @param <V> value type of the store
* @param <S> type of the {@link StateStore}
* @return a new {@link Materialized} instance with the given storeName
*/
public static <K, V, S extends StateStore> Materialized<K, V, S> as(StoreImplType storeType);
/**
* Set the type of the materialized {@link StateStore}.
*
* @param storeType the store type {@link StoreType} to use.
* @return itself
*/
public Materialized<K, V, S> withStoreType(StoreImplType storeType);
} |
We'll also add 3 static APIs and deprecate existing 8 store supplier creator in Stores
to simplify the API.
Code Block | ||
---|---|---|
| ||
/** * @deprecated Since 3.2.0 Use {@link #sessionStoreSupplier(StoreType, String, long)} instead. */ public static SessionBytesStoreSupplier inMemorySessionStore(final String name, final Duration retentionPeriod) {} /** * @deprecated Since 3.2.0 Use {@link #sessionStoreSupplier(StoreType, String, long)} instead. */ public static SessionBytesStoreSupplier persistentSessionStore(final String name, final Duration retentionPeriod) {} /** * Create a {@link SessionBytesStoreSupplier} by the provided {@code storeType}. * * @param storeType {@link org.apache.kafka.streams.kstream.Materialized.StoreType} of the store * @param storeName name of the store (cannot be {@code null}) * @param retentionPeriod length ot time to retain data in the store (cannot be negative) * (note that the retention period must be at least as long enough to * contain the inactivity gap of the session and the entire grace period.) * @return an instance of a {@link SessionBytesStoreSupplier} */ public static SessionBytesStoreSupplier sessionStoreSupplier(final StoreType storeType, final String storeName, final long retentionPeriod) {} /** * @deprecated Since 3.2.0 Use {@link #windowStoreSupplier(StoreType, String, Duration, Duration, boolean, boolean)} instead. */ public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final String name, final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates) {} /** * @deprecated Since 3.2.0 Use {@link #windowStoreSupplier(StoreType, String, Duration, Duration, boolean, boolean)} instead. */ public static WindowBytesStoreSupplier persistentWindowStore(final String name, final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates) {} /** * @deprecated Since 3.2.0 Use {@link #windowStoreSupplier(StoreType, String, Duration, Duration, boolean, boolean)} instead. */ public static WindowBytesStoreSupplier inMemoryWindowStore(final String name, final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates) /** * Create a persistent {@link WindowBytesStoreSupplier} by the provided {@code storeType}. * * @param storeType {@link org.apache.kafka.streams.kstream.Materialized.StoreType} of the store * @param storeName name of the store (cannot be {@code null}) * @param retentionPeriod length of time to retain data in the store (cannot be negative) * (note that the retention period must be at least long enough to contain the * windowed data's entire life cycle, from window-start through window-end, * and for the entire grace period) * @param windowSize size of the windows (cannot be negative) * @param retainDuplicates whether or not to retain duplicates. Turning this on will automatically disable * caching and means that null values will be ignored. * @param timestampedStore whether or not to create a timestamped store. Only applied to rocksDB store type * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize} */ public static WindowBytesStoreSupplier windowStoreSupplier(final Materialized.StoreType storeType, final String storeName, final long retentionPeriod, final long windowSize, final boolean retainDuplicates, final boolean timestampedStore) {} /** * @deprecated Since 3.2.0 Use {@link #keyValueStoreSupplier(StoreType, String, boolean)} instead. */ public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) {} /** * @deprecated Since 3.2.0 Use {@link #keyValueStoreSupplier(StoreType, String, boolean)} instead. */ public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(final String name) {} /** * @deprecated Since 3.2.0 Use {@link #keyValueStoreSupplier(StoreType, String, boolean)} instead. */ public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(final String name) {} /** * Create a persistent {@link KeyValueBytesStoreSupplier} by the provided {@code storeType}. * * @param storeType {@link org.apache.kafka.streams.kstream.Materialized.StoreType} of the store * @param storeName name of the store (cannot be {@code null}) * @param timestampedStore whether or not to create a timestamped store. Only applied to rocksDB store type * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used * to build a key-(timestamp/value) store */ public static KeyValueBytesStoreSupplier keyValueStoreSupplier(final Materialized.StoreType storeType, final String storeName, final boolean timestampedStore) {} |
In order to leverage the new configuration, users will need to call the new proposed API in KAFKA-13281, to provide topology-config
while construct the StreamBuilder
. Currently, the internal implementation for topology-config
is all set. In this KIP, we'll publish the API to allow pass the topology-config
into StreamBuilder
.
...