...
JIRA: Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-9847
Released:2.6 (target)
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
We propose to simplify switching the store type via a new config to Kafka Streams that allows to set the default built-in or custom store type implementation for the whole program as well as an improved API for for Materialized
.
Public Interfaces
We propose to add a new configuration parameter default.store.impl.class
that defines the default store implementation class used by the DSL. Default to RocksDBStoreImplementationRocksDBPersistentStoreImplementation.class
Code Block |
---|
public class StreamsConfig { public static final String DEFAULT_STORE_IMPLEMENTATION_CLASS_CONFIG = "default.store.impl.class"; private static final String DEFAULT_STORE_IMPLEMENTATION_CLASS_DOC = "Store supplier implementation class to use. Default is set as RocksDBStoreImplementationRocksDBPersistentStoreImplementation. It can be overwritten dynamically during streaming operation."; .define(DEFAULT_STORE_IMPLEMENTATION_CLASS_CONFIG, Type.CLASS, RocksDBStoreImplementationRocksDBPersistentStoreImplementation.class.getName(), Importance.MEDIUM, DEFAULT_STORE_IMPLEMENTATION_CLASS_DOC) } |
The StoreImplementation
interface:
Code Block | ||
---|---|---|
| ||
/** * A state * Materialize astore supplier Implementation interface for all types of {@link StateStore}. with the store implementation. * */ public interface StoreImplementation { /** * @paramCreate storeImplementationa store implementation used to materialize the store{@link KeyValueBytesStoreSupplier}. * * alphanumerics, '.', '_' and '-'. * @param <K>@param name name of the store (cannot be {@code null}) * @return an key typeinstance of thea store {@link KeyValueBytesStoreSupplier} that can be *used @param <V> * to build a persistent key-value type of the store */ @param <S> KeyValueBytesStoreSupplier keyValueSupplier(String name); type of the {@link StateStore} /** * @returnCreate a new {@link MaterializedWindowBytesStoreSupplier}. instance with the given storeName* */ @param name public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StoreImplementation storeImplementation) | ||
Code Block | ||
| ||
/** * A state name of the store supplier(cannot Implementationbe interface for all types of {@link StateStore}. * */ public interface StoreImplementation { /** * Create a {@link KeyValueBytesStoreSupplier}. *{@code null}) * @param retentionPeriod length of time to retain data in the store (cannot be negative) * @param name name of the store (cannot be {@code null}) * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used (note that *the toretention buildperiod amust persistentbe key-value store */at least long enough to contain the KeyValueBytesStoreSupplier keyValueSupplier(String name); * /** * Create a {@link WindowBytesStoreSupplier}. * * @param name windowed data's entire life cycle, from window-start through window-end, name* of the store (cannot be {@code null}) * @param retentionPeriod length of time to retain dataand infor the storeentire (cannot be negativegrace period) * @param windowSize size of the windows (cannot be negative) * @param retainDuplicates (note that thewhether retentionor periodnot mustto beretain atduplicates. leastTurning longthis enoughon towill containautomatically thedisable * windowedcaching data'sand entiremeans lifethat cycle,null fromvalues window-startwill through window-end,be ignored. * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} and for* the@throws entireIllegalArgumentException grace period) * @param windowSize if {@code retentionPeriod} is smaller than {@code windowSize} */ WindowBytesStoreSupplier windowBytesStoreSupplier(String name, size of the windows (cannot be negative) * @param retainDuplicates whether or not to retain duplicates. Turning this on will automatically disable * Duration retentionPeriod, caching and means that null values will be ignored. * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code longDuration milliseconds}windowSize, * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize} */ WindowBytesStoreSupplier windowBytesStoreSupplier(String name, boolean retainDuplicates); /** * Create a {@link SessionBytesStoreSupplier}. * Duration* retentionPeriod, @param name name of the store (cannot be {@code null}) * @param retentionPeriod length of time to retain data in the store (cannot be negative) * Duration windowSize, (note that the retention period must be at least as long enough to * boolean retainDuplicates); /** contain the *inactivity Creategap aof {@link SessionBytesStoreSupplier}. * * @param name the session and the entire grace period.) * @return an instance of a {@link SessionBytesStoreSupplier} */ name of the storeSessionBytesStoreSupplier sessionBytesStoreSupplier(cannotString bename, {@code null}) * @param retentionPeriod length of time to retain data in the store (cannot be negative) * (note that the retention period must be at least as long enough to * Duration retentionPeriod); } |
For the StoreImplementation, Kakfa provides 3 built-in kinds of implementation:
RocksDBPersistentStoreImplementation.java (default)
Code Block | ||
---|---|---|
| ||
/** * A Rocks DB persistent state store supplier Implementation. * */ public class RocksDBPersistentStoreImplementation implements StoreImplementation { @Override public KeyValueBytesStoreSupplier keyValueSupplier(final String name) { return Stores.persistentTimestampedKeyValueStore(name); contain the} inactivity gap of the@Override session and the entirepublic grace period.) WindowBytesStoreSupplier windowBytesStoreSupplier(final String name, * @return an instance of a {@link SessionBytesStoreSupplier} */ SessionBytesStoreSupplier sessionBytesStoreSupplier(String name, final Duration retentionPeriod, Duration retentionPeriod); } |
Proposed Changes
final Duration windowSize,
final boolean retainDuplicates) {
return Stores.persistentTimestampedWindowStore(name, retentionPeriod, windowSize, retainDuplicates);
}
@Override
public SessionBytesStoreSupplier sessionBytesStoreSupplier(final String name, final Duration retentionPeriod) {
return Stores.persistentSessionStore(name, retentionPeriod);
}
} |
RocksDBStoreImplementation.java
Code Block | ||
---|---|---|
| ||
/**
* A Rocks DB state store supplier Implementation.
*
*/
public class RocksDBStoreImplementation implements StoreImplementation {
@Override
public KeyValueBytesStoreSupplier keyValueSupplier(final String name) {
return Stores.persistentKeyValueStore(name);
}
@Override
public WindowBytesStoreSupplier windowBytesStoreSupplier(final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates) {
return Stores.persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates);
}
@Override
public SessionBytesStoreSupplier sessionBytesStoreSupplier(final String name, final Duration retentionPeriod) {
return Stores.persistentSessionStore(name, retentionPeriod);
}
} |
InMemoryStoreImplementation.java
Code Block | ||
---|---|---|
| ||
/**
* A In-memory state store supplier Implementation.
*
*/
public class InMemoryStoreImplementation implements StoreImplementation {
@Override
public KeyValueBytesStoreSupplier keyValueSupplier(final String name) {
return Stores.inMemoryKeyValueStore(name);
}
@Override
public WindowBytesStoreSupplier windowBytesStoreSupplier(final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates) {
return Stores.inMemoryWindowStore(name, retentionPeriod, windowSize, retainDuplicates);
}
@Override
public SessionBytesStoreSupplier sessionBytesStoreSupplier(final String name, final Duration retentionPeriod) {
return Stores.inMemorySessionStore(name, retentionPeriod);
}
} |
In Materialized class, we provided one more API to allow users to provide StoreImplementation
dynamically.
Code Block | ||
---|---|---|
| ||
/**
* Materialize a {@link StateStore} with the store implementation.
*
* @param storeImplementation store implementation used to materialize the store
* @param <K> key type of the store
* @param <V> value type of the store
* @param <S> type of the {@link StateStore}
* @return a new {@link Materialized} instance with the given storeImplementation
*/
public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StoreImplementation storeImplementation) |
Proposed Changes
In order to leverage the new configuration, users will need to call the new proposed API in KAFKA-13281, to provide topology-config
while construct the StreamBuilder
.
Code Block | ||
---|---|---|
| ||
Properties props = new Properties();
...
props.put(StreamsConfig.DEFAULT_STORE_IMPLEMENTATION_CLASS_CONFIG, InMemoryStoreImplementation.class.getName());
StreamsConfig streamsConfig = new StreamsConfig(props);
builder = new StreamsBuilder(new TopologyConfig(streamsConfig));
topology = builder.build(); |
If not set, it'll default to use RocksDBPersistentStoreImplementation
, which is the same as current behavior. So this is a backward-compatible designIn order to leverage the new configuration, users will need to call StreamsBuilder#build(Properties)
instead of StreamsBuilder#build()
. This is an already established pattern that users need to follow to enable topology optimization. As we predict that we might need to access the config in the DSL more often in the future, we also propose to deprecate StreamsBuilder#build()
method to lift the mental burden for users to know which of both methods that should use.
All stateful operators (like aggregate()
, reduce()
, count()
, or join()
, will use either the RocksDB or in-memory store depending on the state store in the "default.store.impl.class"
configuration. Of course, if the store is overwritten for an individual operator via via Materialized
the the operator overwrite will be used. The improved improved Materialized
interface allows to switch the store type more easily without the need to specify a store name.
Code Block |
---|
KStream stream = ... // current code to change from RocksDB to in-memory stream.groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("some-name")); // new code stream.groupByKey().count(Materialized.as(StoreType.IN_MEMORYnew InMemoryStoreImplementation())); |
Compatibility, Deprecation, and Migration Plan
Because the default value of the config is "rocksDBRocksDBPersistentStoreImplementation"
there is no behavior change and thus there are not backward compatibility concerns. The deprecates method StreamsBuilder#build()
can be removed in the next major release 3.0.0compatibility concerns.
Test Plan
Regular unit and integration testing is sufficient.
...
In addition, we propose to extend Materialized
config object with a corresponding option to specify the store type::
Code Block |
---|
public class Materialized<K, V, S extends StateStore> {
public enum StoreType {
ROCKS_DB,
IN_MEMORY
}
public static <K |
Code Block |
public class Materialized<K, V, S extends StateStore> { Materialized<K, V, S> as(StoryType storeType); public enumMaterialized<K, StoreTypeV, { S> ROCKS_DB, IN_MEMORY } withStoreType(StoryType storeType); } |
Finally, we propose to deprecate the StreamsBuilder#build()
method.
Code Block |
---|
public class StreamsBuilder { @Deprecate public staticsynchronized <K, V, S extends StateStore> Materialized<K, V, S> as(StoryType storeType); public Materialized<K, V, S> withStoreType(StoryType storeType); } |
Finally, we propose to deprecate the StreamsBuilder#build()
method.
...
Topology build();
} |
In order to leverage the new configuration, users will need to call StreamsBuilder#build(Properties)
instead of StreamsBuilder#build()
. This is an already established pattern that users need to follow to enable topology optimization. As we predict that we might need to access the config in the DSL more often in the future, we also propose to deprecate StreamsBuilder#build()
method to lift the mental burden for users to know which of both methods that should use.
...