Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public class StreamsConfig {
    public static final String DEFAULT_DSL_STORE_IMPL_TYPE_CONFIG = "default.dsl.store.impl.type";
    private static final String DEFAULT_DSL_STORE_IMPL_TYPE_DOC = "The default store implementation type used by DSL operators.";

    public static final ROCKS_DB = "rocksDB";
    public static final IN_MEMORY = "in_memory";
    
    .define(DEFAULT_DSL_STORE_IMPL_TYPE_CONFIG,
            Type.STRING,
            ROCKS_DB,
            in(ROCKS_DB, IN_MEMORY),
            Importance.LOW,             
			DEFAULT_DSL_STORE_IMPL_TYPE_DOC)
}

In addition, we propose to extend Materialized config object with a corresponding option to specify the store type:

...

Code Block
languagejava
/**
 * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
 * {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will
 * determine the defaults, which can then be overridden for specific topologies by passing them in when creating the
 * topology builders via the {@link org.apache.kafka.streams.StreamsBuilder()} method.
 */
public class TopologyConfig extends AbstractConfig {
    private static final ConfigDef CONFIG;
    static {
        CONFIG = new ConfigDef()
             .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
                     Type.INT,
                     null,
                     Importance.LOW,
                     BUFFERED_RECORDS_PER_PARTITION_DOC)
             .define(CACHE_MAX_BYTES_BUFFERING_CONFIG,
                    Type.LONG,
                    null,
                    Importance.MEDIUM,
                    CACHE_MAX_BYTES_BUFFERING_DOC)
             .define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
                    Type.CLASS,
                    null,
                    Importance.MEDIUM,
                    DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
             .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
                     Type.CLASS,
                     null,
                     Importance.MEDIUM,
                     DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC)
             .define(MAX_TASK_IDLE_MS_CONFIG,
                     Type.LONG,
                     null,
                     Importance.MEDIUM,
                     MAX_TASK_IDLE_MS_DOC)
             .define(TASK_TIMEOUT_MS_CONFIG,
                     Type.LONG,
                     null,
                     Importance.MEDIUM,
                     TASK_TIMEOUT_MS_DOC);
             .define(DEFAULT_DSL_STORE_IMPL_TYPE_CONFIG,                
					 Type.STRING,
		             ROCKS_DB,
         		     in(ROCKS_DB, IN_MEMORY),
		             Importance.LOW,
         		     DEFAULT_DSL_STORE_IMPL_TYPE_DOC);
    }
Code Block
languagejava
public class StreamsBuilder {

 	public TopologyConfig(final StreamsConfig public StreamsBuilder(globalAppConfigs) {}

	/**
     * Create  this(null, globalAppConfigs, new Properties());
    }

    public TopologyConfig(final String topologyName, final StreamsConfig globalAppConfigs, final Properties topologyOverrides) {}

	// check whether this is a named topology 
  	public boolean isNamedTopology(){}

	// get this task config with the topology config
	public TaskConfig getTaskConfig() {}

     



Code Block
languagejava
public class StreamsBuilder {
    public StreamsBuilder() {}

	/**
     * Create a {@code StreamsBuilder} a {@code StreamsBuilder} instance.
	 *
	 * @param topologyConfigs    the streams configs that apply at the topology level. Please refer to {@link TopologyConfig} for more detail
     */
	public StreamsBuilder(final TopologyConfig topologyConfigs) {} // new added

...

Code Block
languagejava
Properties props = new Properties();
...
// set the "default.dsl.store.type" config as "in-memory store"
props.put(StreamsConfig.DEFAULT_DSL_STORE_IMPL_TYPE_CONFIG, StreamsConfig.IN_MEMORY);
StreamsConfig streamsConfig = new StreamsConfig(props);

builder = new StreamsBuilder(new TopologyConfig(streamsConfig));
topology = builder.build(); 

...

All stateful operators (like aggregate()reduce(),  count(), or join(), will use the state store in the `default.dsl.store.type` configuration. Of course, if the store is overwritten for an individual operator via Materialized the operator overwrite will be used. The improved Materialized interface allows to switch the store type more easily without the need to specify a store name.

...