...
Code Block |
---|
public class StreamsConfig {
public static final String DEFAULT_DSL_STORE_IMPL_TYPE_CONFIG = "default.dsl.store.impl.type";
private static final String DEFAULT_DSL_STORE_IMPL_TYPE_DOC = "The default store implementation type used by DSL operators.";
public static final ROCKS_DB = "rocksDB";
public static final IN_MEMORY = "in_memory";
.define(DEFAULT_DSL_STORE_IMPL_TYPE_CONFIG,
Type.STRING,
ROCKS_DB,
in(ROCKS_DB, IN_MEMORY),
Importance.LOW,
DEFAULT_DSL_STORE_IMPL_TYPE_DOC)
} |
In addition, we propose to extend Materialized
config object with a corresponding option to specify the store type:
Code Block |
---|
public class Materialized<K, V, S extends StateStore> {
public enum StoreTypeStoreImplType {
ROCKS_DB,
IN_MEMORY
}
public static <K, V, S extends StateStore> Materialized<K, V, S> as(StoryTypeStoreImplType storeType);
public Materialized<K, V, S> withStoreType(StoryTypeStoreImplType storeType);
} |
In order to leverage the new configuration, users will need to call the new proposed API in KAFKA-13281, to provide topology-config
while construct the StreamBuilder
. Currently, the internal implementation for topology-config
is all set. In this KIP, we'll publish the API to allow pass the topology-config
into StreamBuilder
.
...
Code Block |
---|
|
/**
* Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
* {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will
* determine the defaults, which can then be overridden for specific topologies by passing them in when creating the
* topology builders via the {@link org.apache.kafka.streams.StreamsBuilder()} method.
*/
public class TopologyConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
static {
CONFIG = new ConfigDef()
.define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
Type.INT,
null,
Importance.LOW,
BUFFERED_RECORDS_PER_PARTITION_DOC)
.define(CACHE_MAX_BYTES_BUFFERING_CONFIG,
Type.LONG,
null,
Importance.MEDIUM,
CACHE_MAX_BYTES_BUFFERING_DOC)
.define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
Type.CLASS,
null,
Importance.MEDIUM,
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
.define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
Type.CLASS,
null,
Importance.MEDIUM,
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC)
.define(MAX_TASK_IDLE_MS_CONFIG,
Type.LONG,
null,
Importance.MEDIUM,
MAX_TASK_IDLE_MS_DOC)
.define(TASK_TIMEOUT_MS_CONFIG,
Type.LONG,
null,
Importance.MEDIUM,
TASK_TIMEOUT_MS_DOC);
.define(DEFAULT_DSL_STORE_IMPL_TYPE_CONFIG,
Type.STRING,
ROCKS_DB,
in(ROCKS_DB, IN_MEMORY),
Importance.LOW,
DEFAULT_DSL_STORE_IMPL_TYPE_DOC);
} |
Code Block |
---|
|
public class StreamsBuilder {
public StreamsBuilder() {}
/**
* Create a {@code StreamsBuilder} instance.
*
* @param topologyConfigs the streams configs that apply at the topology level. Please refer to {@link TopologyConfig} for more detail
*/
public StreamsBuilder(final TopologyConfig topologyConfigs) {} // new added
|
...
Code Block |
---|
|
Properties props = new Properties();
...
// set the "default.dsl.store.type" config as "in-memory store"
props.put(StreamsConfig.DEFAULT_DSL_STORE_IMPL_TYPE_CONFIG, StreamsConfig.IN_MEMORY);
StreamsConfig streamsConfig = new StreamsConfig(props);
builder = new StreamsBuilder(new TopologyConfig(streamsConfig));
topology = builder.build(); |
...
Code Block |
---|
KStream stream = ...
// current code to change from RocksDB to in-memory
stream.groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("some-name"));
// new code
stream.groupByKey().count(Materialized.withStoreType(StoreTypeStoreImplType.IN_MEMORY)); |
Compatibility, Deprecation, and Migration Plan
Because the default value of the config is "ROCKROCKS_DB"
there is no behavior change and thus there are not backward compatibility concerns.
...