THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
In order to leverage the new configuration, users will need to call the new proposed API in KAFKA-13281, to provide topology-config
while construct the StreamBuilder
. (currently not published, yet, but Currently, the internal implementation for topology-config
is all set. In this KIP, we'll publish the
Code Block | ||
---|---|---|
| ||
/** * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the * {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will * determine the defaults, which can then be overridden for specific topologies by passing them in when creating the * topology builders via the {@link org.apache.kafka.streams.StreamsBuilder()} method. */ public class TopologyConfig extends AbstractConfig { private static final ConfigDef CONFIG; static { CONFIG = new ConfigDef() .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG, Type.INT, null, Importance.LOW, BUFFERED_RECORDS_PER_PARTITION_DOC) .define(CACHE_MAX_BYTES_BUFFERING_CONFIG, Type.LONG, null, Importance.MEDIUM, CACHE_MAX_BYTES_BUFFERING_DOC) .define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, Type.CLASS, null, Importance.MEDIUM, DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC) .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, Type.CLASS, null, Importance.MEDIUM, DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC) .define(MAX_TASK_IDLE_MS_CONFIG, Type.LONG, null, Importance.MEDIUM, MAX_TASK_IDLE_MS_DOC) .define(TASK_TIMEOUT_MS_CONFIG, Type.LONG, null, Importance.MEDIUM, TASK_TIMEOUT_MS_DOC); .define(DEFAULT_STORE_IMPLEMENTATION_CLASS_CONFIG, Type.CLASS, RocksDBPersistentStoreImplementation.class.getName(), Importance.MEDIUM, DEFAULT_STORE_IMPLEMENTATION_CLASS_DOC) } |
Code Block | ||
---|---|---|
| ||
Properties props = new Properties(); ... // set the "default.store.impl.class" config as "InMemoryStoreImpelementation" props.put(StreamsConfig.DEFAULT_STORE_IMPLEMENTATION_CLASS_CONFIG, InMemoryStoreImplementation.class.getName()); StreamsConfig streamsConfig = new StreamsConfig(props); builder = new StreamsBuilder(new TopologyConfig(streamsConfig)); topology = builder.build(); |
...