...
Code Block |
---|
public class StreamsConfig {
public static final String DEFAULT_DSL_STORE_IMPL_TYPE_CONFIG = "default.dsl.store.impl.type";
private static final String DEFAULT_DSL_STORE_IMPL_TYPE_DOC = "The default store implementation type used by DSL operators.";
public static final ROCKS_DB = "rocksDB";
public static final IN_MEMORY = "in_memory";
.define(DEFAULT_DSL_STORE_IMPL_TYPE_CONFIG,
Type.STRING,
ROCKS_DB,
in(ROCKS_DB, IN_MEMORY),
Importance.LOW,
DEFAULT_DSL_STORE_IMPL_TYPE_DOC)
} |
In addition, we propose to extend Materialized
config object with a corresponding option to specify the store type:
...
Code Block |
---|
|
/**
* Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
* {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will
* determine the defaults, which can then be overridden for specific topologies by passing them in when creating the
* topology builders via the {@link org.apache.kafka.streams.StreamsBuilder()} method.
*/
public class TopologyConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
static {
CONFIG = new ConfigDef()
.define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
Type.INT,
null,
Importance.LOW,
BUFFERED_RECORDS_PER_PARTITION_DOC)
.define(CACHE_MAX_BYTES_BUFFERING_CONFIG,
Type.LONG,
null,
Importance.MEDIUM,
CACHE_MAX_BYTES_BUFFERING_DOC)
.define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
Type.CLASS,
null,
Importance.MEDIUM,
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
.define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
Type.CLASS,
null,
Importance.MEDIUM,
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC)
.define(MAX_TASK_IDLE_MS_CONFIG,
Type.LONG,
null,
Importance.MEDIUM,
MAX_TASK_IDLE_MS_DOC)
.define(TASK_TIMEOUT_MS_CONFIG,
Type.LONG,
null,
Importance.MEDIUM,
TASK_TIMEOUT_MS_DOC);
.define(DEFAULT_DSL_STORE_IMPL_TYPE_CONFIG,
Type.STRING,
ROCKS_DB,
in(ROCKS_DB, IN_MEMORY),
Importance.LOW,
DEFAULT_DSL_STORE_IMPL_TYPE_DOC);
} |
Code Block |
---|
|
public class StreamsBuilder {
public TopologyConfig(final StreamsConfig public StreamsBuilder(globalAppConfigs) {}
/**
* Create this(null, globalAppConfigs, new Properties());
}
public TopologyConfig(final String topologyName, final StreamsConfig globalAppConfigs, final Properties topologyOverrides) {}
// check whether this is a named topology
public boolean isNamedTopology(){}
// get this task config with the topology config
public TaskConfig getTaskConfig() {}
|
Code Block |
---|
|
public class StreamsBuilder {
public StreamsBuilder() {}
/**
* Create a {@code StreamsBuilder} a {@code StreamsBuilder} instance.
*
* @param topologyConfigs the streams configs that apply at the topology level. Please refer to {@link TopologyConfig} for more detail
*/
public StreamsBuilder(final TopologyConfig topologyConfigs) {} // new added
|
...
Code Block |
---|
|
Properties props = new Properties();
...
// set the "default.dsl.store.type" config as "in-memory store"
props.put(StreamsConfig.DEFAULT_DSL_STORE_IMPL_TYPE_CONFIG, StreamsConfig.IN_MEMORY);
StreamsConfig streamsConfig = new StreamsConfig(props);
builder = new StreamsBuilder(new TopologyConfig(streamsConfig));
topology = builder.build(); |
...
All stateful operators (like aggregate()
, reduce()
, count()
, or join()
, will use the state store in the `default.dsl.store.type`
configuration. Of course, if the store is overwritten for an individual operator via Materialized
the operator overwrite will be used. The improved Materialized
interface allows to switch the store type more easily without the need to specify a store name.
...