...
Code Block |
---|
|
/**
* Materialize a {@link StateStore} with the store implementation.
*
* @param storeImplementation store implementation used to materialize the store
* @param <K> key type of the store
* @param <V> value type of the store
* @param <S> type of the {@link StateStore}
* @return a new {@link Materialized} instance with the given storeImplementation
*/
public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StoreImplementation storeImplementation) |
Proposed Changes
In order to leverage the new configuration, users will need to call the new proposed API in KAFKA-13281, to provide topology-config
while construct the StreamBuilder
. Currently, the internal implementation for topology-config
is all set. In this KIP, we'll publish the API to allow pass the topology-config
into StreamBuilder
.
Code Block |
---|
|
/**
* Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
* {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will
* determine the defaults, which can then be overridden for specific topologies by passing them in when creating the
* topology builders via the {@link org.apache.kafka.streams.StreamsBuilder()} method.
*/
public class TopologyConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
static {
CONFIG = new ConfigDef()
.define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
Type.INT,
null,
Importance.LOW,
BUFFERED_RECORDS_PER_PARTITION_DOC)
.define(CACHE_MAX_BYTES_BUFFERING_CONFIG,
Type.LONG,
null,
Importance.MEDIUM,
CACHE_MAX_BYTES_BUFFERING_DOC)
.define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
Type.CLASS,
null,
Importance.MEDIUM,
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
.define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
Type.CLASS,
null,
Importance.MEDIUM,
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC)
.define(MAX_TASK_IDLE_MS_CONFIG,
Type.LONG,
null,
Importance.MEDIUM,
MAX_TASK_IDLE_MS_DOC)
.define(TASK_TIMEOUT_MS_CONFIG,
Type.LONG,
null,
Importance.MEDIUM,
TASK_TIMEOUT_MS_DOC);
.define(DEFAULT_STORE_IMPLEMENTATION_CLASS_CONFIG,
Type.CLASS,
RocksDBPersistentStoreImplementation.class.getName(),
Importance.MEDIUM,
DEFAULT_STORE_IMPLEMENTATION_CLASS_DOC)
} |
Code Block |
---|
|
public class StreamsBuilder {
public StreamsBuilder() {}
/**
* Create a {@code StreamsBuilder} instance.
*
* @param topologyConfigs the streams configs that apply at the topology level. Please refer to {@link TopologyConfig} for more detail
*/
public StreamsBuilder(final TopologyConfig topologyConfigs) {} // new added
|
Proposed Changes
To create a topology with a non-default store implementation, user can declare it via new
StreamBuilder(topologyConfig)
like this:
Code Block |
---|
|
Properties props = new Properties();
...
// set the "default.store.impl.class" config as "InMemoryStoreImpelementation"
props.Properties props = new Properties();
...
// set the "default.store.impl.class" config as "InMemoryStoreImpelementation"
props.put(StreamsConfig.DEFAULT_STORE_IMPLEMENTATION_CLASS_CONFIG, InMemoryStoreImplementation.class.getName());
StreamsConfig streamsConfig = new StreamsConfig(props);
builder = new StreamsBuilder(new TopologyConfig(streamsConfig));
topology = builder.build(); |
If not set, itor using the old new StreamBuidler()
it'll default to use RocksDBPersistentStoreImplementation
, which is the same as current behavior. So this is a backward-compatible design.
...