Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state"Under DiscussionAccepted"

Discussion thread:

1. [DISCUSS] KIP-591: Add Kafka Streams config to set default store type

2. discuss: https://lists.apache.org/thread/238rstw9zj49vyhzzoh67d8b74vz4nbm

3. vote: https://lists.apache.org/thread/qz8s06t2brjcv928bo26qqtz06ysg3ln


JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9847

...

Motivation

Kafka Streams supports persistent RocksDB stores as well as in-memory stores out of the box. By default all DSL operators use persistent RocksDB stores. Currently, it is only possible to switch out RocksDB stores with in-memory store on a per operator basis what is tedious if all stores should be switched (for example in a Kubernetes deployment without local disks).

Furthermore, the current store interface via Materialized is very generic as it allows to pass in any custom state store implementation with the side effect that a user needs to name the store and thus also makes it queryable via "Interactive Queries".

We propose to simplify switching the store type via a new config to Kafka Streams that allows to set the default built-in or custom store implementation types for the whole program as well as an improved API for Materialized.

...

We propose to add a new configuration parameter default.dsl.store.impl.class  that defines the default store implementation class type used by the DSL . Default to RocksDBPersistentStoreImplementation.class accepting two values "rocksDB"  (default) and "in_memory" .

Code Block
public class StreamsConfig {     
    public static final String DEFAULT_DSL_STORE_IMPLEMENTATION_CLASS_CONFIG = "default.dsl.store.impl.class";
    private static final String DEFAULT_DSL_STORE_IMPLEMENTATION_CLASS_DOC = 
"The default store implementation type used by DSL operators.";

  "Store supplier implementationpublic classstatic to use. Default is set as RocksDBPersistentStoreImplementation. It can be overwritten dynamically during streaming operation.";                


	.define(DEFAULT_STORE_IMPLEMENTATION_CLASS_final ROCKS_DB = "rocksDB";
    public static final IN_MEMORY = "in_memory";
    
    .define(DEFAULT_DSL_STORE_CONFIG,
            Type.CLASSSTRING,
            RocksDBPersistentStoreImplementation.class.getName()ROCKS_DB,
            Importance.MEDIUM in(ROCKS_DB, IN_MEMORY),
            DEFAULT_STORE_IMPLEMENTATION_CLASSImportance.LOW,             
			DEFAULT_DSL_STORE_DOC) 
}

The StoreImplementation  interfaceIn addition, we propose to extend Materialized config object with a corresponding option to specify the store type:

Code Block
languagejava
/**
 * A state store supplier Implementation interface for all types of {@link StateStore}.
 *
 */
public interface StoreImplementation {
    /**
  public class Materialized<K, V, S extends StateStore> {

  public enum StoreImplType {
    ROCKS_DB,
    IN_MEMORY
  }

  
  /**
   * CreateMaterialize a {@link KeyValueBytesStoreSupplierStateStore}.
 with the given {@link *StoreType}.
   *
   * @param storeType name the nametype of the state store
 (cannot be {@code null})
 * @param <K>    *  @return ankey instancetype of athe {@linkstore
 KeyValueBytesStoreSupplier} that can* be@param used
<V>     * to buildvalue atype persistentof key-valuethe store
   * @param <S> */
    KeyValueBytesStoreSupplier keyValueSupplier(String name);

    /**
   type of the {@link StateStore}
   * Create@return a new {@link WindowBytesStoreSupplierMaterialized}.
 instance with the given *storeName
   */
  *public @paramstatic name<K, V, S extends StateStore> Materialized<K, V, S> as(StoreImplType storeType);

  /**
   * Set the  nametype of the store (cannot be {@code null})materialized {@link StateStore}.
   *
   * @param retentionPeriodstoreType  the store type {@link StoreType} lengthto ofuse.
 time to retain* data in the store (cannot be negative)@return itself
     */
  public Materialized<K, V, S> withStoreType(StoreImplType storeType);
}


In order to leverage the new configuration, users will need to call the new proposed API in KAFKA-13281, to provide topology-config while construct the StreamBuilder . Currently, the internal implementation for topology-config is all set. In this KIP, we'll publish the API to allow pass the topology-config into StreamBuilder .


Code Block
languagejava
/**
 * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
 * {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will
 * determine the defaults, which can then be overridden for specific topologies by passing them in when creating the
 * topology builders via the {@link org.apache.kafka.streams.StreamsBuilder()} method.
 */
public class TopologyConfig extends AbstractConfig {
    private static final ConfigDef CONFIG;
    static {
                         (note that the retention period must be at least long enough to contain the
     *                              windowed data's entire life cycle, from window-start through window-end,
     *                          CONFIG =   and for the entire grace periodnew ConfigDef()
     * @param windowSize            size of the windows (cannot be negative)
.define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
        * @param retainDuplicates      whether or not to retain duplicates. Turning this on will automatically disableType.INT,
     *                null,
              caching and means that null values will be ignoredImportance.LOW,
     * @return an instance of {@link WindowBytesStoreSupplier}
     * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
     * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
	 */
    WindowBytesStoreSupplier windowBytesStoreSupplier(String name  BUFFERED_RECORDS_PER_PARTITION_DOC)
             .define(CACHE_MAX_BYTES_BUFFERING_CONFIG,
                    Type.LONG,
                    null,
                    Importance.MEDIUM,
              Duration retentionPeriod,
     CACHE_MAX_BYTES_BUFFERING_DOC)
             .define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
                    Type.CLASS,
                Duration windowSize,
   null,
                    Importance.MEDIUM,
                    DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
           boolean retainDuplicates);


    /**
 .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
       * Create a {@link SessionBytesStoreSupplier}.
     * 
     * @paramType.CLASS,
 name              name of the store (cannot be {@code null}),
     * @param retentionPeriod   length of time to retain data in the store (cannot be negative) Importance.MEDIUM,
     *                DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC)
          (note that the retention period must be at least as long enough to   .define(MAX_TASK_IDLE_MS_CONFIG,
                     Type.LONG,
     *                null,
          contain the inactivity gap of the session and the entire grace periodImportance.)MEDIUM,
     * @return an instance of a {@link  SessionBytesStoreSupplier}
             MAX_TASK_IDLE_MS_DOC)
      */
      SessionBytesStoreSupplier sessionBytesStoreSupplier(String name.define(TASK_TIMEOUT_MS_CONFIG,
                     Type.LONG,
                     null,
                    Duration retentionPeriod);
}

For the StoreImplementation, Kakfa provides 3 built-in kinds of implementation:

RocksDBPersistentStoreImplementation.java (default)

Code Block
languagejava
/**
 * A Rocks DB persistent state store supplier Implementation.
 *
 */
public class RocksDBPersistentStoreImplementation implements StoreImplementation {
    @Override
    public KeyValueBytesStoreSupplier keyValueSupplier(final String name) {
        return Stores.persistentTimestampedKeyValueStore(name);
    }

    @Override
    public WindowBytesStoreSupplier windowBytesStoreSupplier(final String name Importance.MEDIUM,
                     TASK_TIMEOUT_MS_DOC);
             .define(DEFAULT_DSL_STORE_CONFIG,                
					 Type.STRING,
		             ROCKS_DB,
         		         in(ROCKS_DB, IN_MEMORY),
		             Importance.LOW,
         		     DEFAULT_DSL_STORE_DOC);
    }

 	public TopologyConfig(final StreamsConfig globalAppConfigs) {
       final Duration retentionPeriod,
  this(null, globalAppConfigs, new Properties());
    }

    public TopologyConfig(final String topologyName, final StreamsConfig globalAppConfigs, final Properties topologyOverrides) {}

	// check whether this is a named topology 
  	public boolean isNamedTopology(){}

	// get this task config with the topology config
	public TaskConfig getTaskConfig() {}

     



Code Block
languagejava
public class StreamsBuilder {
    public StreamsBuilder() {}

	/**
    final Duration* windowSize,
Create a {@code StreamsBuilder} instance.
	 *
	 * @param topologyConfigs    the streams configs that apply at the topology level. Please refer to {@link TopologyConfig} for more detail
                                   final boolean retainDuplicates) {
        return Stores.persistentTimestampedWindowStore(name, retentionPeriod, windowSize, retainDuplicates);
    }

    @Override
    public SessionBytesStoreSupplier sessionBytesStoreSupplier(final String name, final Duration retentionPeriod) {
        return Stores.persistentSessionStore(name, retentionPeriod);
    }
}

RocksDBStoreImplementation.java

*/
	public StreamsBuilder(final TopologyConfig topologyConfigs) {} // new added


Proposed Changes

To create a topology with a non-default store implementation, user can declare it via new StreamBuilder(topologyConfig)  like this:

Code Block
languagejava
Properties props = new Properties();
...
// set the "default.dsl.store" config as "in-memory store"
props.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, StreamsConfig.IN_MEMORY);
StreamsConfig streamsConfig = new StreamsConfig(props);

builder = new StreamsBuilder(new TopologyConfig(streamsConfig));
topology = builder.build(); 


If not set, or using the old new StreamBuidler() it'll default to use RocksDB store , which is the same as current behavior. So this is a backward-compatible design.


All stateful operators (like aggregate()reduce(),  count(), or join(), will use the state store in the `default.dsl.store` configuration. Of course, if the store is overwritten for an individual operator via Materialized the operator overwrite will be used. The improved Materialized interface allows to switch the store type more easily without the need to specify a store name.

Code Block
KStream stream = ...

// current code to change from RocksDB to in-memory
stream.groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("some-name"));

// new code, without the need to provide a name
stream.groupByKey().count(Materialized.withStoreType(StoreImplType.IN_MEMORY));

Compatibility, Deprecation, and Migration Plan

Because the default value of the config is "ROCKS_DB"  there is no behavior change and thus there are not backward compatibility concerns.

Test Plan

Regular unit and integration testing is sufficient.

Rejected Alternatives

1. use store implementation class for allowing default custom store implementation.


We propose to add a new configuration parameter default.store.impl.class  that defines the default store implementation class used by the DSL. Default to RocksDBPersistentStoreImplementation.class 

Code Block
public class StreamsConfig {     
    public static final String DEFAULT_STORE_IMPLEMENTATION_CLASS_CONFIG = "default.store.impl.class";
    private static final String DEFAULT_STORE_IMPLEMENTATION_CLASS_DOC = 
Code Block
languagejava
/**
 * A Rocks DB state store supplier Implementation.
 *
 */
public class RocksDBStoreImplementation implements StoreImplementation {
    @Override
    public KeyValueBytesStoreSupplier keyValueSupplier(final String name) {
        return Stores.persistentKeyValueStore(name);
    }

    @Override
    public WindowBytesStoreSupplier windowBytesStoreSupplier(final String name,
                                                             final Duration retentionPeriod,
                                                             final Duration windowSize,
                    "Store supplier implementation class to use. Default is set as RocksDBPersistentStoreImplementation. It can be overwritten dynamically during streaming operation.";                   


	.define(DEFAULT_STORE_IMPLEMENTATION_CLASS_CONFIG,
            Type.CLASS,
 final boolean retainDuplicates) {
        return Stores.persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates);RocksDBPersistentStoreImplementation.class.getName(),
    }

    @Override
    public SessionBytesStoreSupplier sessionBytesStoreSupplier(final String name, final Duration retentionPeriod) {
Importance.MEDIUM,
            return Stores.persistentSessionStore(name, retentionPeriod);
    }
}DEFAULT_STORE_IMPLEMENTATION_CLASS_DOC) 
}


The StoreImplementation  interface:InMemoryStoreImplementation.java

Code Block
languagejava
/**
 * A In-memory state store supplier Implementation interface for all types of {@link StateStore}.
 *
 */
public classinterface InMemoryStoreImplementation implements StoreImplementation {
    @Override   /**
    public KeyValueBytesStoreSupplier* keyValueSupplier(final String name) {Create a {@link KeyValueBytesStoreSupplier}.
        return Stores.inMemoryKeyValueStore(name);
 *
   }

  * @param @Override
name  name of publicthe WindowBytesStoreSupplierstore windowBytesStoreSupplier(final String name,cannot be {@code null})
     * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
     * to build a persistent key-value store
     */
    KeyValueBytesStoreSupplier keyValueSupplier(String name);

    /**
     * Create a {@link WindowBytesStoreSupplier}.
     *
     final* Duration@param retentionPeriod,
name                  name of the store (cannot be {@code null})
     * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
     *      final Duration windowSize,
                      (note that the retention period must be at least long enough to contain the
     *                      final boolean retainDuplicates) {
     windowed data's entire return Stores.inMemoryWindowStore(name, retentionPeriod, windowSize, retainDuplicates);life cycle, from window-start through window-end,
    }

 *   @Override
    public SessionBytesStoreSupplier sessionBytesStoreSupplier(final String name, final Duration retentionPeriod) {
        return Stores.inMemorySessionStore(name, retentionPeriod);
    }
}

In Materialized class, we provided one more API to allow users to provide StoreImplementation  dynamically.

Code Block
languagejava
	/**
     * Materialize a {@link StateStore} with the store implementation.
 and for the entire grace period)
     * @param windowSize     *
     * @param storeImplementationsize of storethe implementationwindows used(cannot to materialize the storebe negative)
     * @param <K>retainDuplicates      whether keyor typenot ofto theretain store
duplicates. Turning this on will *automatically @paramdisable
 <V>    *   value type of the store
     * @param <S>       type of the {@link StateStore}

     * @return acaching newand {@linkmeans Materialized}that instancenull withvalues thewill givenbe storeImplementationignored.
     */
    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StoreImplementation storeImplementation)

In order to leverage the new configuration, users will need to call the new proposed API in KAFKA-13281, to provide topology-config while construct the StreamBuilder . Currently, the internal implementation for topology-config is all set. In this KIP, we'll publish the API to allow pass the topology-config into StreamBuilder .

Code Block
languagejava
/**
 * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
 * {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will
 * determine the defaults, which can then be overridden for specific topologies by passing them in when creating the
 * topology builders via the {@link org.apache.kafka.streams.StreamsBuilder()} method.
 */
public class TopologyConfig extends AbstractConfig {
    private static final ConfigDef CONFIG;
    static { @return an instance of {@link WindowBytesStoreSupplier}
     * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
     * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
	 */
    WindowBytesStoreSupplier windowBytesStoreSupplier(String name,
                                                      Duration retentionPeriod,
        CONFIG   = new ConfigDef()
             .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
                     Type.INT,
       Duration windowSize,
             null,
                     Importance.LOW,
                     BUFFERED_RECORDS_PER_PARTITION_DOC)
boolean retainDuplicates);


    /**
     * Create a {@link .define(CACHE_MAX_BYTES_BUFFERING_CONFIG,SessionBytesStoreSupplier}.
     * 
     * @param name         Type.LONG,
     name of the store (cannot be   {@code null})
     * @param retentionPeriod   null,
          length of time to retain data in the store (cannot be negative)
     *     Importance.MEDIUM,
                    CACHE_MAX_BYTES_BUFFERING_DOC)
             .define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
 (note that the retention period must be at least as long enough to
     *                Type.CLASS,
          contain the inactivity gap of the session and the  null,entire grace period.)
     * @return an instance of a {@link  SessionBytesStoreSupplier}
     */
    SessionBytesStoreSupplier  Importance.MEDIUMsessionBytesStoreSupplier(String name,
                    DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
              .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
                     Type.CLASS,
                     null, Duration retentionPeriod);
}


For the StoreImplementation, Kakfa provides 3 built-in kinds of implementation:

RocksDBPersistentStoreImplementation.java (default)

Code Block
languagejava
/**
 * A Rocks DB persistent state store supplier Implementation.
 *
 */
public class RocksDBPersistentStoreImplementation implements StoreImplementation {
    @Override
    public KeyValueBytesStoreSupplier keyValueSupplier(final String name) {
        return Stores.persistentTimestampedKeyValueStore(name);
   Importance.MEDIUM, }

    @Override
    public WindowBytesStoreSupplier windowBytesStoreSupplier(final String name,
         DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC)
             .define(MAX_TASK_IDLE_MS_CONFIG,
                     Type.LONG,
                   final Duration nullretentionPeriod,
                     Importance.MEDIUM,
                         MAX_TASK_IDLE_MS_DOC)
               final  .define(TASK_TIMEOUT_MS_CONFIGDuration windowSize,
                     Type.LONG,
                      null,
                  final boolean  Importance.MEDIUM,retainDuplicates) {
        return Stores.persistentTimestampedWindowStore(name, retentionPeriod, windowSize, retainDuplicates);
    }

     TASK_TIMEOUT_MS_DOC);@Override
             .define(DEFAULT_STORE_IMPLEMENTATION_CLASS_CONFIG,
    public SessionBytesStoreSupplier sessionBytesStoreSupplier(final String name, final Duration retentionPeriod) {
        		return TypeStores.CLASS,persistentSessionStore(name, retentionPeriod);
    }
}


RocksDBStoreImplementation.java

Code Block
languagejava
/**
 * A Rocks DB state store 			supplier RocksDBPersistentStoreImplementation.class.getName(),
      Implementation.
 *
 */
public class RocksDBStoreImplementation implements StoreImplementation {
    @Override
  		 Importance.MEDIUM,
 public KeyValueBytesStoreSupplier keyValueSupplier(final String name) {
      		 DEFAULT_STORE_IMPLEMENTATION_CLASS_DOC)   return Stores.persistentKeyValueStore(name);
    }
Code Block
languagejava
public class StreamsBuilder {

    @Override
    public StreamsBuilderWindowBytesStoreSupplier windowBytesStoreSupplier() {}

	/**
final String name,
          * Create a {@code StreamsBuilder} instance.
	 *
	 * @param topologyConfigs    the streams configs that apply at the topology level. Please refer to {@link TopologyConfig} for more detail
     */
	public StreamsBuilder(final TopologyConfig topologyConfigs) {} // new added

Proposed Changes

To create a topology with a non-default store implementation, user can declare it via new StreamBuilder(topologyConfig)  like this:

Code Block
languagejava
Properties props = new Properties();
...
// set the "default.store.impl.class" config as "InMemoryStoreImpelementation"
props.put(StreamsConfig.DEFAULT_STORE_IMPLEMENTATION_CLASS_CONFIG, InMemoryStoreImplementation.class.getName());
StreamsConfig streamsConfig = new StreamsConfig(props);

builder = new StreamsBuilder(new TopologyConfig(streamsConfig));
topology = builder.build(); 

If not set, or using the old new StreamBuidler() it'll default to use RocksDBPersistentStoreImplementation , which is the same as current behavior. So this is a backward-compatible design.

All stateful operators (like aggregate()reduce(),  count(), or join(), will use the state store in the "default.store.impl.class" configuration. Of course, if the store is overwritten for an individual operator via Materialized the operator overwrite will be used. The improved Materialized interface allows to switch the store type more easily without the need to specify a store name.

Code Block
KStream stream = ...

// current code to change from RocksDB to in-memory
stream.groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("some-name"));

// new code
stream.groupByKey().count(Materialized.as(new InMemoryStoreImplementation()));

Compatibility, Deprecation, and Migration Plan

Because the default value of the config is "RocksDBPersistentStoreImplementation"  there is no behavior change and thus there are not backward compatibility concerns.

Test Plan

Regular unit and integration testing is sufficient.

Rejected Alternatives

1. Use enum  toset the default store type

We propose to add a new configuration parameter default.dsl.store.type that defines the default store type used by the DSL accepting two values "rocksDB"  (default) and "in_memory" .

             final Duration retentionPeriod,
                                                             final Duration windowSize,
                                                             final boolean retainDuplicates) {
        return Stores.persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates);
    }

    @Override
    public SessionBytesStoreSupplier sessionBytesStoreSupplier(final String name, final Duration retentionPeriod) {
        return Stores.persistentSessionStore(name, retentionPeriod);
    }
}


InMemoryStoreImplementation.java

Code Block
languagejava
/**
 * A In-memory state store supplier Implementation.
 *
 */
public class InMemoryStoreImplementation implements StoreImplementation {
    @Override
    public KeyValueBytesStoreSupplier keyValueSupplier(final String name) {
        return Stores.inMemoryKeyValueStore(name);
    }

    @Override
    public WindowBytesStoreSupplier windowBytesStoreSupplier(final String name,
                                                             final Duration retentionPeriod,
                                                             final Duration windowSize,
                                                             final boolean retainDuplicates) {
        return Stores.inMemoryWindowStore(name, retentionPeriod, windowSize, retainDuplicates);
    }

    @Override
    public SessionBytesStoreSupplier sessionBytesStoreSupplier(final String name, final Duration retentionPeriod) {
Code Block
public class StreamsConfig {
    public static final String DEFAULT_DSL_STORE_TYPE_CONFIG = "default.dsl.store.type";
    private static final String DEFAULT_DSL_STORE_TYPE_DOC = "The default store type used by DSL operators.";

    public static final ROCKS_DB = "rocksDB";
    public static final IN_MEMORY = "in_memory";
    
    .define(DEFAULT_DSL_STORE_TYPE_CONFIG,
        return Stores.inMemorySessionStore(name, retentionPeriod);
    Type.STRING,
}
}


In Materialized class, we provided one more API to allow users to provide StoreImplementation  dynamically.

Code Block
languagejava
	/**
     * Materialize a {@link StateStore} with the  ROCKS_DB,store implementation.
     *
     *  in(ROCKS_DB, IN_MEMORY),
       @param storeImplementation  store implementation used to materialize the store
     Importance.LOW,
* @param <K>       key type  DEFAULT_DSL_STORE_TYPE_DOC)
}

In addition, we propose to extend Materialized config object with a corresponding option to specify the store type:

Code Block
public class Materialized<K, V, S extends StateStore> {

  public enum StoreType {
    ROCKS_DB,
    IN_MEMORY
  }

  public static <K, V, S extends StateStore> Materialized<K, V, S> as(StoryType storeType);

  public Materialized<K, V, S> withStoreType(StoryType storeType);
}

Finally, we propose to deprecate the StreamsBuilder#build() method.

Code Block
public class StreamsBuilder {
  @Deprecate
  public synchronized Topology build();
}

In order to leverage the new configuration, users will need to call StreamsBuilder#build(Properties) instead of StreamsBuilder#build(). This is an already established pattern that users need to follow to enable topology optimization. As we predict that we might need to access the config in the DSL more often in the future, we also propose to deprecate StreamsBuilder#build() method to lift the mental burden for users to know which of both methods that should use.

...

of the store
     * @param <V>       value type of the store
     * @param <S>       type of the {@link StateStore}

     * @return a new {@link Materialized} instance with the given storeImplementation
     */
    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StoreImplementation storeImplementation)


→ in the KIP, there's a trade-off regarding the API complexity.
With the store impl, we can support default custom stores, but introduce more complexity for users, while with the enum types, users can configure default built-in store types easily, but it can't work for custom stores.

For me, I'm OK to narrow down the scope and introduce the default built-in enum store types first.
And if there's further request, we can consider a better way to support default store impl.