Table of Contents |
---|
Status
Current state: "Under DiscussionAccepted"
Discussion thread:
1. [DISCUSS] KIP-591: Add Kafka Streams config to set default store type
2. discuss: https://lists.apache.org/thread/238rstw9zj49vyhzzoh67d8b74vz4nbm
3. vote: https://lists.apache.org/thread/qz8s06t2brjcv928bo26qqtz06ysg3ln
JIRA: Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-9847
...
Kafka Streams supports RocksDB stores as well as in-memory stores out of the box. By default all DSL operators use RocksDB stores. Currently, it is only possible to switch out RocksDB stores with in-memory store on a per operator basis what is tedious if all stores should be switched (for example in a Kubernetes deployment without local disks).
Furthermore, the current store interface via Materialized
is very generic as it allows to pass in any custom state store implementation with the side effect that a user needs to name the store and thus also makes it queryable via "Interactive Queries".
...
We propose to add a new configuration parameter default.dsl.store.type
that that defines the default store type used by the DSL accepting two values "rocksDB"
(default) and "in_memory"
.
Code Block |
---|
public class StreamsConfig { public static final String DEFAULT_DSL_STORE_IMPL_TYPE_CONFIG = "default.dsl.store.impl.type"; private static final String DEFAULT_DSL_STORE_IMPL_TYPE_DOC = "The default store implementation type used by DSL operators."; public static final ROCKS_DB = "rocksDB"; public static final IN_MEMORY = "in_memory"; .define(DEFAULT_DSL_STORE_IMPL_TYPE_CONFIG, Type.STRING, ROCKS_DB, in(ROCKS_DB, IN_MEMORY), Importance.LOW, DEFAULT_DSL_STORE_IMPL_TYPE_DOC) } |
In addition, we propose to extend Materialized
config object with a corresponding option to specify the store type:
Code Block |
---|
public class Materialized<K, V, S extends StateStore> { public enum StoreImplType { ROCKS_DB, IN_MEMORY } public static <K, V, S extends StateStore> Materialized<K, V, S> as(StoreImplType storeType); public Materialized<K, V, S> withStoreType(StoreImplType storeType); } |
In order to leverage the new configuration, users will need to call the new proposed API in KAFKA-13281, to provide topology-config
while construct the StreamBuilder
. Currently, the internal implementation for topology-config
is all set. In this KIP, we'll publish the API to allow pass the topology-config
into StreamBuilder
.
/**
* Materialize a {@link StateStore} with the given {@link StoreType}.
*
* @param storeType the type of the state store
* @param <K> key type of the store
* @param <V> value type of the store
* @param <S> type of the {@link StateStore}
* @return a new {@link Materialized} instance with the given storeName
*/
public static <K, V, S extends StateStore> Materialized<K, V, S> as(StoreImplType storeType);
/**
* Set the type of the materialized {@link StateStore}.
*
* @param storeType the store type {@link StoreType} to use.
* @return itself
*/
public Materialized<K, V, S> withStoreType(StoreImplType storeType);
} |
In order to leverage the new configuration, users will need to call the new proposed API in KAFKA-13281, to provide topology-config
while construct the StreamBuilder
. Currently, the internal implementation for topology-config
is all set. In this KIP, we'll publish the API to allow pass the topology-config
into StreamBuilder
.
Code Block | ||
---|---|---|
| ||
/**
* Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
* {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will
* determine the defaults, which can then be overridden for specific topologies by passing them in when creating the
* topology builders via the {@link org.apache.kafka.streams.StreamsBuilder()} method.
*/
public class TopologyConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
static {
CONFIG = new ConfigDef()
.define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
Type.INT,
null,
Importance.LOW,
| ||
Code Block | ||
| ||
/** * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the * {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will * determine the defaults, which can then be overridden for specific topologies by passing them in when creating the * topology builders via the {@link org.apache.kafka.streams.StreamsBuilder()} method. */ public class TopologyConfig extends AbstractConfig { private static final ConfigDef CONFIG; static { CONFIG = new ConfigDef( BUFFERED_RECORDS_PER_PARTITION_DOC) .define(BUFFEREDCACHE_RECORDSMAX_PERBYTES_PARTITIONBUFFERING_CONFIG, Type.INTLONG, null, Importance.LOWMEDIUM, BUFFEREDCACHE_RECORDSMAX_PERBYTES_PARTITIONBUFFERING_DOC) .define(CACHEDEFAULT_DESERIALIZATION_MAXEXCEPTION_BYTESHANDLER_BUFFERINGCLASS_CONFIG, Type.LONGCLASS, null, Importance.MEDIUM, CACHEDEFAULT_DESERIALIZATION_MAXEXCEPTION_BYTESHANDLER_BUFFERINGCLASS_DOC) .define(DEFAULT_DESERIALIZATIONTIMESTAMP_EXCEPTIONEXTRACTOR_HANDLER_CLASS_CONFIG, Type.CLASS, null, Importance.MEDIUM, DEFAULT_DESERIALIZATIONTIMESTAMP_EXCEPTION_HANDLEREXTRACTOR_CLASS_DOC) .define(DEFAULTMAX_TIMESTAMPTASK_EXTRACTORIDLE_CLASSMS_CONFIG, Type.CLASSLONG, null, Importance.MEDIUM, DEFAULTMAX_TIMESTAMPTASK_EXTRACTORIDLE_CLASSMS_DOC) .define(MAX_TASK_IDLETIMEOUT_MS_CONFIG, Type.LONG, null, Importance.MEDIUM, MAX_TASK_IDLETIMEOUT_MS_DOC); .define(TASKDEFAULT_TIMEOUTDSL_MSSTORE_CONFIG, Type.LONG, Type.STRING, null, ROCKS_DB, Importance.MEDIUMin(ROCKS_DB, IN_MEMORY), Importance.LOW, TASK_TIMEOUT_MS_DOC); .define( DEFAULT_DSL_STORE_IMPL_TYPE_CONFIG, Type.STRING, DOC); } public TopologyConfig(final StreamsConfig globalAppConfigs) { ROCKS_DB,this(null, globalAppConfigs, new Properties()); } public TopologyConfig(final String topologyName, final StreamsConfig in(ROCKS_DBglobalAppConfigs, IN_MEMORY), Importance.LOW, DEFAULT_DSL_STORE_IMPL_TYPE_DOC);final Properties topologyOverrides) {} // check whether this is a named topology public boolean isNamedTopology(){} // get this task config with the topology config public TaskConfig getTaskConfig() {} } |
Code Block | ||
---|---|---|
| ||
public class StreamsBuilder { public StreamsBuilder() {} /** * Create a {@code StreamsBuilder} instance. * * @param topologyConfigs the streams configs that apply at the topology level. Please refer to {@link TopologyConfig} for more detail */ public StreamsBuilder(final TopologyConfig topologyConfigs) {} // new added |
...
Code Block | ||
---|---|---|
| ||
Properties props = new Properties(); ... // set the "default.dsl.store.type" config as "in-memory store" props.put(StreamsConfig.DEFAULT_DSL_STORE_IMPL_TYPE_CONFIG, StreamsConfig.IN_MEMORY); StreamsConfig streamsConfig = new StreamsConfig(props); builder = new StreamsBuilder(new TopologyConfig(streamsConfig)); topology = builder.build(); |
...
All stateful operators (like aggregate()
, reduce()
, count()
, or join()
, will use the state store in the `default.dsl.store.type`
configuration. Of course, if the store is overwritten for an individual operator via Materialized
the operator overwrite will be used. The improved Materialized
interface allows to switch the store type more easily without the need to specify a store name.
Code Block |
---|
KStream stream = ...
// current code to change from RocksDB to in-memory
stream.groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("some-name"));
// new code, without the need to provide a name
stream.groupByKey().count(Materialized.withStoreType(StoreImplType.IN_MEMORY)); |
...