Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Kafka Streams supports RocksDB stores as well as in-memory stores out of the box. By default all DSL operators use RocksDB stores. Currently, it is only possible to switch out RocksDB stores with in-memory store on a per operator basis what is tedious if all stores should be switched (for example in a Kubernetes deployment without local disks).

Furthermore, the current store interface via Materialized is very generic as it allows to pass in any custom state store implementation with the side effect that a user needs to name the store and thus also makes it queryable via "Interactive Queries".

...

In addition, we propose to extend Materialized config object with a corresponding option to specify the store type:

Code Block
public class Materialized<K, V, S extends StateStore> {

  public enum StoreImplType {
    ROCKS_DB,
    IN_MEMORY
  }

  public static <K, V, S extends StateStore> Materialized<K, V, S> as(StoreImplType storeType);

  public Materialized<K, V, S> withStoreType(StoreImplType storeType);
 class Materialized<K, V, S extends StateStore> {

  public enum StoreImplType {
    ROCKS_DB,
    IN_MEMORY
  }

  
  /**
   * Materialize a {@link StateStore} with the given {@link StoreType}.
   *
   * @param storeType  the type of the state store
   * @param <K>       key type of the store
   * @param <V>       value type of the store
   * @param <S>       type of the {@link StateStore}
   * @return a new {@link Materialized} instance with the given storeName
   */
  public static <K, V, S extends StateStore> Materialized<K, V, S> as(StoreImplType storeType);

  /**
   * Set the type of the materialized {@link StateStore}.
   *
   * @param storeType  the store type {@link StoreType} to use.
   * @return itself
   */
  public Materialized<K, V, S> withStoreType(StoreImplType storeType);
}


We'll also add 3 static helper API in Stores , to allow the implementation to be able to get the store supplier by the store type.

Code Block
languagejava
    /**
     * Create a {@link SessionBytesStoreSupplier} by the provided {@code storeType}.
     *
     * @param storeType         {@link org.apache.kafka.streams.kstream.Materialized.StoreType} of the store
     * @param storeName         name of the store (cannot be {@code null})
     * @param retentionPeriod   length ot time to retain data in the store (cannot be negative)
     *                          (note that the retention period must be at least as long enough to
     *                          contain the inactivity gap of the session and the entire grace period.)
     * @return an instance of a {@link  SessionBytesStoreSupplier}
     */
    public static SessionBytesStoreSupplier sessionStoreSupplierByStoreType(final Materialized.StoreType storeType,
                                                                            final String storeName,
                                                                            final long retentionPeriod) {}


	/**
     * Create a persistent {@link WindowBytesStoreSupplier} by the provided {@code storeType}.
     *
     * @param storeType             {@link org.apache.kafka.streams.kstream.Materialized.StoreType} of the store
     * @param storeName             name of the store (cannot be {@code null})
     * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
     *                              (note that the retention period must be at least long enough to contain the
     *                              windowed data's entire life cycle, from window-start through window-end,
     *                              and for the entire grace period)
     * @param windowSize            size of the windows (cannot be negative)
     * @return an instance of {@link WindowBytesStoreSupplier}
     * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
     * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
     */
    public static WindowBytesStoreSupplier windowStoreSupplierByStoreType(final Materialized.StoreType storeType,
                                                                          final String storeName,
                                                                          final long retentionPeriod,
                                                                          final long windowSize) {}

	 /**
     * Create a persistent {@link KeyValueBytesStoreSupplier} by the provided {@code storeType}.
     *
     * @param storeType             {@link org.apache.kafka.streams.kstream.Materialized.StoreType} of the store
     * @param storeName             name of the store (cannot be {@code null})
     * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
     * to build a persistent key-(timestamp/value) store
     */
    public static KeyValueBytesStoreSupplier keyValueStoreSupplierByStoreType(final Materialized.StoreType storeType,
                                                                              final String storeName) {}



In order to leverage the new configuration, users will need to call the new proposed API in KAFKA-13281, to provide topology-config while construct the StreamBuilder . Currently, the internal implementation for topology-config is all set. In this KIP, we'll publish the API to allow pass the topology-config into StreamBuilder .

...

Code Block
KStream stream = ...

// current code to change from RocksDB to in-memory
stream.groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("some-name"));

// new code, without the need to provide a name
stream.groupByKey().count(Materialized.withStoreType(StoreImplType.IN_MEMORY));

...