Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state"Under DiscussionAccepted"

Discussion thread:

1. [DISCUSS] KIP-591: Add Kafka Streams config to set default store type

2. discuss: https://lists.apache.org/thread/238rstw9zj49vyhzzoh67d8b74vz4nbm

3. vote: https://lists.apache.org/thread/qz8s06t2brjcv928bo26qqtz06ysg3ln


JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9847

...

Kafka Streams supports RocksDB stores as well as in-memory stores out of the box. By default all DSL operators use RocksDB stores. Currently, it is only possible to switch out RocksDB stores with in-memory store on a per operator basis what is tedious if all stores should be switched (for example in a Kubernetes deployment without local disks).

Furthermore, the current store interface via Materialized is very generic as it allows to pass in any custom state store implementation with the side effect that a user needs to name the store and thus also makes it queryable via "Interactive Queries".

...

We propose to add a new configuration parameter default.dsl.store.type that that defines the default store type used by the DSL accepting two values "rocksDB"  (default) and "in_memory" .

Code Block
public class StreamsConfig {
    public static final String DEFAULT_DSL_STORE_TYPE_CONFIG = "default.dsl.store.type";
    private static final String DEFAULT_DSL_STORE_TYPE_DOC = "The default store implementation type used by DSL operators.";

    public static final ROCKS_DB = "rocksDB";
    public static final IN_MEMORY = "in_memory";
    
    .define(DEFAULT_DSL_STORE_TYPE_CONFIG,
            Type.STRING,
            ROCKS_DB,
            in(ROCKS_DB, IN_MEMORY),
            Importance.LOW,
                   
			DEFAULT_DSL_STORE_TYPE_DOC)
}

In addition, we propose to extend Materialized config object with a corresponding option to specify the store type:

Code Block
public class Materialized<K, V, S extends StateStore> {

  public enum StoreTypeStoreImplType {
    ROCKS_DB,
    IN_MEMORY
  }

  
  /**
 public static <K,* V,Materialize Sa extends{@link StateStore>StateStore} Materialized<K,with V,the S> as(StoryType storeType);
given {@link StoreType}.
   *
  public Materialized<K,* V,@param S>storeType withStoreType(StoryType storeType);
}

In order to leverage the new configuration, users will need to call the new proposed API in KAFKA-13281, to provide topology-config while construct the StreamBuilder . Currently, the internal implementation for topology-config is all set. In this KIP, we'll publish the API to allow pass the topology-config into StreamBuilder .

the type of the state store
   * @param <K>       key type of the store
   * @param <V>       value type of the store
   * @param <S>       type of the {@link StateStore}
   * @return a new {@link Materialized} instance with the given storeName
   */
  public static <K, V, S extends StateStore> Materialized<K, V, S> as(StoreImplType storeType);

  /**
   * Set the type of the materialized {@link StateStore}.
   *
   * @param storeType  the store type {@link StoreType} to use.
   * @return itself
   */
  public Materialized<K, V, S> withStoreType(StoreImplType storeType);
}


In order to leverage the new configuration, users will need to call the new proposed API in KAFKA-13281, to provide topology-config while construct the StreamBuilder . Currently, the internal implementation for topology-config is all set. In this KIP, we'll publish the API to allow pass the topology-config into StreamBuilder .


Code Block
languagejava
/**
 * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
 * {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will
 * determine the defaults, which can then be overridden for specific topologies by passing them in when creating the
 * topology builders via the {@link org.apache.kafka.streams.StreamsBuilder()} method.
 */
public class TopologyConfig extends AbstractConfig {
    private static final ConfigDef CONFIG;
    static {
        CONFIG = new ConfigDef()
             .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
                     Type.INT,
                     null,
                     Importance.LOW,
          
Code Block
languagejava
/**
 * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
 * {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will
 * determine the defaults, which can then be overridden for specific topologies by passing them in when creating the
 * topology builders via the {@link org.apache.kafka.streams.StreamsBuilder()} method.
 */
public class TopologyConfig extends AbstractConfig {
    private static final ConfigDef CONFIG;
    static {
        CONFIG = new ConfigDef( BUFFERED_RECORDS_PER_PARTITION_DOC)
             .define(BUFFEREDCACHE_RECORDSMAX_PERBYTES_PARTITIONBUFFERING_CONFIG,
                     Type.INTLONG,
                     null,
                     Importance.LOWMEDIUM,
                     BUFFEREDCACHE_RECORDSMAX_PERBYTES_PARTITIONBUFFERING_DOC)
             .define(CACHEDEFAULT_DESERIALIZATION_MAXEXCEPTION_BYTESHANDLER_BUFFERINGCLASS_CONFIG,
                    Type.LONGCLASS,
                    null,
                    Importance.MEDIUM,
                    CACHEDEFAULT_DESERIALIZATION_MAXEXCEPTION_BYTESHANDLER_BUFFERINGCLASS_DOC)
             .define(DEFAULT_DESERIALIZATIONTIMESTAMP_EXCEPTIONEXTRACTOR_HANDLER_CLASS_CONFIG,
                     Type.CLASS,
                     null,
                     Importance.MEDIUM,
                     DEFAULT_DESERIALIZATIONTIMESTAMP_EXCEPTION_HANDLEREXTRACTOR_CLASS_DOC)
             .define(DEFAULTMAX_TIMESTAMPTASK_EXTRACTORIDLE_CLASSMS_CONFIG,
                     Type.CLASSLONG,
                     null,
                     Importance.MEDIUM,
                     DEFAULTMAX_TIMESTAMPTASK_EXTRACTORIDLE_CLASSMS_DOC)
             .define(MAX_TASK_IDLETIMEOUT_MS_CONFIG,
                     Type.LONG,
                     null,
                     Importance.MEDIUM,
                     MAX_TASK_IDLETIMEOUT_MS_DOC);
                   .define(TASKDEFAULT_TIMEOUTDSL_MSSTORE_CONFIG,
                           Type.LONG,
   
					 Type.STRING,
		                     null,
     ROCKS_DB,
         		       Importance.MEDIUM,
in(ROCKS_DB, IN_MEMORY),
		             Importance.LOW,
        TASK_TIMEOUT_MS_DOC);
             .define( 		     DEFAULT_DSL_STORE_TYPE_CONFIG,                
					 Type.STRING,
		    DOC);
    }

 	public TopologyConfig(final StreamsConfig globalAppConfigs) {
         ROCKS_DB,this(null, globalAppConfigs, new Properties());
    }

    public 		TopologyConfig(final String topologyName, final StreamsConfig in(ROCKS_DBglobalAppConfigs, IN_MEMORY),
		             Importance.LOW,
         		     DEFAULT_DSL_STORE_TYPE_DOC);final Properties topologyOverrides) {}

	// check whether this is a named topology 
  	public boolean isNamedTopology(){}

	// get this task config with the topology config
	public TaskConfig getTaskConfig() {}

    } 



Code Block
languagejava
public class StreamsBuilder {
    public StreamsBuilder() {}

	/**
     * Create a {@code StreamsBuilder} instance.
	 *
	 * @param topologyConfigs    the streams configs that apply at the topology level. Please refer to {@link TopologyConfig} for more detail
     */
	public StreamsBuilder(final TopologyConfig topologyConfigs) {} // new added

...

Code Block
languagejava
Properties props = new Properties();
...
// set the "default.dsl.store.type" config as "in-memory store"
props.put(StreamsConfig.DEFAULT_DSL_STORE_TYPE_CONFIG, StreamsConfig.IN_MEMORY);
StreamsConfig streamsConfig = new StreamsConfig(props);

builder = new StreamsBuilder(new TopologyConfig(streamsConfig));
topology = builder.build(); 

...

All stateful operators (like aggregate()reduce(),  count(), or join(), will use the state store in the `default.dsl.store.type` configuration. Of course, if the store is overwritten for an individual operator via Materialized the operator overwrite will be used. The improved Materialized interface allows to switch the store type more easily without the need to specify a store name.

Code Block
KStream stream = ...

// current code to change from RocksDB to in-memory
stream.groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("some-name"));

// new code, without the need to provide a name
stream.groupByKey().count(Materialized.withStoreType(StoreTypeStoreImplType.IN_MEMORY));

Compatibility, Deprecation, and Migration Plan

Because the default value of the config is "ROCKROCKS_DB"  there is no behavior change and thus there are not backward compatibility concerns.

...