Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public class StreamsConfig {
    public static final String DEFAULT_DSL_STORE_IMPL_TYPE_CONFIG = "default.dsl.store.impl.type";
    private static final String DEFAULT_DSL_STORE_IMPL_TYPE_DOC = "The default store implementation type used by DSL operators.";

    public static final ROCKS_DB = "rocksDB";
    public static final IN_MEMORY = "in_memory";
    
    .define(DEFAULT_DSL_STORE_IMPL_TYPE_CONFIG,
            Type.STRING,
            ROCKS_DB,
            in(ROCKS_DB, IN_MEMORY),
            Importance.LOW,
                  DEFAULT_DSL_STORE_IMPL_TYPE_DOC)
}

In addition, we propose to extend Materialized config object with a corresponding option to specify the store type:

Code Block
public class Materialized<K, V, S extends StateStore> {

  public enum StoreTypeStoreImplType {
    ROCKS_DB,
    IN_MEMORY
  }

  public static <K, V, S extends StateStore> Materialized<K, V, S> as(StoryTypeStoreImplType storeType);

  public Materialized<K, V, S> withStoreType(StoryTypeStoreImplType storeType);
}


In order to leverage the new configuration, users will need to call the new proposed API in KAFKA-13281, to provide topology-config while construct the StreamBuilder . Currently, the internal implementation for topology-config is all set. In this KIP, we'll publish the API to allow pass the topology-config into StreamBuilder .

...

Code Block
languagejava
/**
 * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
 * {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will
 * determine the defaults, which can then be overridden for specific topologies by passing them in when creating the
 * topology builders via the {@link org.apache.kafka.streams.StreamsBuilder()} method.
 */
public class TopologyConfig extends AbstractConfig {
    private static final ConfigDef CONFIG;
    static {
        CONFIG = new ConfigDef()
             .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
                     Type.INT,
                     null,
                     Importance.LOW,
                     BUFFERED_RECORDS_PER_PARTITION_DOC)
             .define(CACHE_MAX_BYTES_BUFFERING_CONFIG,
                    Type.LONG,
                    null,
                    Importance.MEDIUM,
                    CACHE_MAX_BYTES_BUFFERING_DOC)
             .define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
                    Type.CLASS,
                    null,
                    Importance.MEDIUM,
                    DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
             .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
                     Type.CLASS,
                     null,
                     Importance.MEDIUM,
                     DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC)
             .define(MAX_TASK_IDLE_MS_CONFIG,
                     Type.LONG,
                     null,
                     Importance.MEDIUM,
                     MAX_TASK_IDLE_MS_DOC)
             .define(TASK_TIMEOUT_MS_CONFIG,
                     Type.LONG,
                     null,
                     Importance.MEDIUM,
                     TASK_TIMEOUT_MS_DOC);
             .define(DEFAULT_DSL_STORE_IMPL_TYPE_CONFIG,                
					 Type.STRING,
		             ROCKS_DB,
         		     in(ROCKS_DB, IN_MEMORY),
		             Importance.LOW,
         		     DEFAULT_DSL_STORE_IMPL_TYPE_DOC);
    }



Code Block
languagejava
public class StreamsBuilder {
    public StreamsBuilder() {}

	/**
     * Create a {@code StreamsBuilder} instance.
	 *
	 * @param topologyConfigs    the streams configs that apply at the topology level. Please refer to {@link TopologyConfig} for more detail
     */
	public StreamsBuilder(final TopologyConfig topologyConfigs) {} // new added

...

Code Block
languagejava
Properties props = new Properties();
...
// set the "default.dsl.store.type" config as "in-memory store"
props.put(StreamsConfig.DEFAULT_DSL_STORE_IMPL_TYPE_CONFIG, StreamsConfig.IN_MEMORY);
StreamsConfig streamsConfig = new StreamsConfig(props);

builder = new StreamsBuilder(new TopologyConfig(streamsConfig));
topology = builder.build(); 

...

Code Block
KStream stream = ...

// current code to change from RocksDB to in-memory
stream.groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("some-name"));

// new code
stream.groupByKey().count(Materialized.withStoreType(StoreTypeStoreImplType.IN_MEMORY));

Compatibility, Deprecation, and Migration Plan

Because the default value of the config is "ROCKROCKS_DB"  there is no behavior change and thus there are not backward compatibility concerns.

...