THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
public class Materialized<K, V, S extends StateStore> { public enum StoreImplType { ROCKS_DB, IN_MEMORY } /** * Materialize a {@link StateStore} with the given {@link StoreType}. * * @param storeType the type of the state store * @param <K> key type of the store * @param <V> value type of the store * @param <S> type of the {@link StateStore} * @return a new {@link Materialized} instance with the given storeName */ public static <K, V, S extends StateStore> Materialized<K, V, S> as(StoreImplType storeType); /** * Set the type of the materialized {@link StateStore}. * * @param storeType the store type {@link StoreType} to use. * @return itself */ public Materialized<K, V, S> withStoreType(StoreImplType storeType); } |
We'll also add 3 static APIs and deprecate existing 8 store supplier creator in Stores
to simplify the API.
Code Block | ||
---|---|---|
| ||
/**
* @deprecated Since 3.2.0 Use {@link #sessionStoreSupplier(StoreType, String, long)} instead.
*/
public static SessionBytesStoreSupplier inMemorySessionStore(final String name, final Duration retentionPeriod) {}
/**
* @deprecated Since 3.2.0 Use {@link #sessionStoreSupplier(StoreType, String, long)} instead.
*/
public static SessionBytesStoreSupplier persistentSessionStore(final String name,
final Duration retentionPeriod) {}
/**
* Create a {@link SessionBytesStoreSupplier} by the provided {@code storeType}.
*
* @param storeType {@link org.apache.kafka.streams.kstream.Materialized.StoreType} of the store
* @param storeName name of the store (cannot be {@code null})
* @param retentionPeriod length ot time to retain data in the store (cannot be negative)
* (note that the retention period must be at least as long enough to
* contain the inactivity gap of the session and the entire grace period.)
* @return an instance of a {@link SessionBytesStoreSupplier}
*/
public static SessionBytesStoreSupplier sessionStoreSupplier(final StoreType storeType,
final String storeName,
final long retentionPeriod) {}
/**
* @deprecated Since 3.2.0 Use {@link #windowStoreSupplier(StoreType, String, Duration, Duration, boolean, boolean)} instead.
*/
public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates) {}
/**
* @deprecated Since 3.2.0 Use {@link #windowStoreSupplier(StoreType, String, Duration, Duration, boolean, boolean)} instead.
*/
public static WindowBytesStoreSupplier persistentWindowStore(final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates) {}
/**
* @deprecated Since 3.2.0 Use {@link #windowStoreSupplier(StoreType, String, Duration, Duration, boolean, boolean)} instead.
*/
public static WindowBytesStoreSupplier inMemoryWindowStore(final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates)
/**
* Create a persistent {@link WindowBytesStoreSupplier} by the provided {@code storeType}.
*
* @param storeType {@link org.apache.kafka.streams.kstream.Materialized.StoreType} of the store
* @param storeName name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* (note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period)
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates. Turning this on will automatically disable
* caching and means that null values will be ignored.
* @param timestampedStore whether or not to create a timestamped store. Only applied to rocksDB store type
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
* @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
*/
public static WindowBytesStoreSupplier windowStoreSupplier(final Materialized.StoreType storeType,
final String storeName,
final long retentionPeriod,
final long windowSize,
final boolean retainDuplicates,
final boolean timestampedStore) {}
/**
* @deprecated Since 3.2.0 Use {@link #keyValueStoreSupplier(StoreType, String, boolean)} instead.
*/
public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) {}
/**
* @deprecated Since 3.2.0 Use {@link #keyValueStoreSupplier(StoreType, String, boolean)} instead.
*/
public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(final String name) {}
/**
* @deprecated Since 3.2.0 Use {@link #keyValueStoreSupplier(StoreType, String, boolean)} instead.
*/
public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(final String name) {}
/**
* Create a persistent {@link KeyValueBytesStoreSupplier} by the provided {@code storeType}.
*
* @param storeType {@link org.apache.kafka.streams.kstream.Materialized.StoreType} of the store
* @param storeName name of the store (cannot be {@code null})
* @param timestampedStore whether or not to create a timestamped store. Only applied to rocksDB store type
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
* to build a key-(timestamp/value) store
*/
public static KeyValueBytesStoreSupplier keyValueStoreSupplier(final Materialized.StoreType storeType,
final String storeName,
final boolean timestampedStore) {} |
In order to leverage the new configuration, users will need to call the new proposed API in KAFKA-13281, to provide topology-config
while construct the StreamBuilder
. Currently, the internal implementation for topology-config
is all set. In this KIP, we'll publish the API to allow pass the topology-config
into StreamBuilder
.
...