Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state"Under DiscussionAccepted"

Discussion thread:

1. [DISCUSS] KIP-591: Add Kafka Streams config to set default store type

2. discuss: https://lists.apache.org/thread/238rstw9zj49vyhzzoh67d8b74vz4nbm

3. vote: https://lists.apache.org/thread/qz8s06t2brjcv928bo26qqtz06ysg3ln


JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9847

...

Code Block
public class Materialized<K, V, S extends StateStore> {

  public enum StoreImplType {
    ROCKS_DB,
    IN_MEMORY
  }

  
  /**
   * Materialize a {@link StateStore} with the given {@link StoreType}.
   *
   * @param storeType  the type of the state store
   * @param <K>       key type of the store
   * @param <V>       value type of the store
   * @param <S>       type of the {@link StateStore}
   * @return a new {@link Materialized} instance with the given storeName
   */
  public static <K, V, S extends StateStore> Materialized<K, V, S> as(StoreImplType storeType);

  /**
   * Set the type of the materialized {@link StateStore}.
   *
   * @param storeType  the store type {@link StoreType} to use.
   * @return itself
   */
  public Materialized<K, V, S> withStoreType(StoreImplType storeType);
}

We'll also add 3 static helper API in Stores , to allow the implementation to be able to get the store supplier by the store type.

Code Block
languagejava
    /**
     * Create a {@link SessionBytesStoreSupplier} by the provided {@code storeType}.
     *
     * @param storeType         {@link org.apache.kafka.streams.kstream.Materialized.StoreType} of the store
     * @param storeName         name of the store (cannot be {@code null})
     * @param retentionPeriod   length ot time to retain data in the store (cannot be negative)
     *                          (note that the retention period must be at least as long enough to
     *                          contain the inactivity gap of the session and the entire grace period.)
     * @return an instance of a {@link  SessionBytesStoreSupplier}
     */
    public static SessionBytesStoreSupplier sessionStoreSupplierByStoreType(final Materialized.StoreType storeType,
                                                                            final String storeName,
                                                                            final long retentionPeriod) {}


	/**
     * Create a persistent {@link WindowBytesStoreSupplier} by the provided {@code storeType}.
     *
     * @param storeType             {@link org.apache.kafka.streams.kstream.Materialized.StoreType} of the store
     * @param storeName             name of the store (cannot be {@code null})
     * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
     *                              (note that the retention period must be at least long enough to contain the
     *                              windowed data's entire life cycle, from window-start through window-end,
     *                              and for the entire grace period)
     * @param windowSize            size of the windows (cannot be negative)
     * @return an instance of {@link WindowBytesStoreSupplier}
     * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
     * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
     */
    public static WindowBytesStoreSupplier windowStoreSupplierByStoreType(final Materialized.StoreType storeType,
                                                                          final String storeName,
                                                                          final long retentionPeriod,
                                                                          final long windowSize) {}

	 /**
     * Create a persistent {@link KeyValueBytesStoreSupplier} by the provided {@code storeType}.
     *
     * @param storeType             {@link org.apache.kafka.streams.kstream.Materialized.StoreType} of the store
     * @param storeName             name of the store (cannot be {@code null})
     * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
     * to build a persistent key-(timestamp/value) store
     */
    public static KeyValueBytesStoreSupplier keyValueStoreSupplierByStoreType(final Materialized.StoreType storeType,
                                                                              final String storeName) {}


In order to leverage the new configuration, users will need to call the new proposed API in KAFKA-13281, to provide topology-config while construct the StreamBuilder . Currently, the internal implementation for topology-config is all set. In this KIP, we'll publish the API to allow pass the topology-config into StreamBuilder .

...