Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In order to leverage the new configuration, users will need to call the new proposed API in KAFKA-13281, to provide topology-config while construct the StreamBuilder . (currently not published, yet, but Currently, the internal implementation for topology-config is all set. In this KIP, we'll publish the


Code Block
languagejava
/**
 * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
 * {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will
 * determine the defaults, which can then be overridden for specific topologies by passing them in when creating the
 * topology builders via the {@link org.apache.kafka.streams.StreamsBuilder()} method.
 */
public class TopologyConfig extends AbstractConfig {
    private static final ConfigDef CONFIG;
    static {
        CONFIG = new ConfigDef()
             .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
                     Type.INT,
                     null,
                     Importance.LOW,
                     BUFFERED_RECORDS_PER_PARTITION_DOC)
             .define(CACHE_MAX_BYTES_BUFFERING_CONFIG,
                    Type.LONG,
                    null,
                    Importance.MEDIUM,
                    CACHE_MAX_BYTES_BUFFERING_DOC)
             .define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
                    Type.CLASS,
                    null,
                    Importance.MEDIUM,
                    DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
             .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
                     Type.CLASS,
                     null,
                     Importance.MEDIUM,
                     DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC)
             .define(MAX_TASK_IDLE_MS_CONFIG,
                     Type.LONG,
                     null,
                     Importance.MEDIUM,
                     MAX_TASK_IDLE_MS_DOC)
             .define(TASK_TIMEOUT_MS_CONFIG,
                     Type.LONG,
                     null,
                     Importance.MEDIUM,
                     TASK_TIMEOUT_MS_DOC);
             .define(DEFAULT_STORE_IMPLEMENTATION_CLASS_CONFIG,
            		 Type.CLASS,
           			 RocksDBPersistentStoreImplementation.class.getName(),
            		 Importance.MEDIUM,
            		 DEFAULT_STORE_IMPLEMENTATION_CLASS_DOC)  
    }





Code Block
languagejava
Properties props = new Properties();
...
// set the "default.store.impl.class" config as "InMemoryStoreImpelementation"
props.put(StreamsConfig.DEFAULT_STORE_IMPLEMENTATION_CLASS_CONFIG, InMemoryStoreImplementation.class.getName());
StreamsConfig streamsConfig = new StreamsConfig(props);

builder = new StreamsBuilder(new TopologyConfig(streamsConfig));
topology = builder.build(); 

...