Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state"Under DiscussionAccepted"

Discussion thread:

1. [DISCUSS] KIP-591: Add Kafka Streams config to set default store type

2. discuss: https://lists.apache.org/thread/238rstw9zj49vyhzzoh67d8b74vz4nbm

3. vote: https://lists.apache.org/thread/qz8s06t2brjcv928bo26qqtz06ysg3ln


JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9847

...

Code Block
public class Materialized<K, V, S extends StateStore> {

  public enum StoreImplType {
    ROCKS_DB,
    IN_MEMORY
  }

  
  /**
   * Materialize a {@link StateStore} with the given {@link StoreType}.
   *
   * @param storeType  the type of the state store
   * @param <K>       key type of the store
   * @param <V>       value type of the store
   * @param <S>       type of the {@link StateStore}
   * @return a new {@link Materialized} instance with the given storeName
   */
  public static <K, V, S extends StateStore> Materialized<K, V, S> as(StoreImplType storeType);

  /**
   * Set the type of the materialized {@link StateStore}.
   *
   * @param storeType  the store type {@link StoreType} to use.
   * @return itself
   */
  public Materialized<K, V, S> withStoreType(StoreImplType storeType);
}

We'll also add 3 static helper API in Stores , to allow the implementation to be able to get the store supplier by the store type.

Code Block
languagejava
    /**
     * @deprecated Since 3.2.0 Use {@link #sessionStoreSupplier(StoreType, String, long)} instead.
     */
    public static SessionBytesStoreSupplier inMemorySessionStore(final String name, final Duration retentionPeriod) {}

    /**
     * @deprecated Since 3.2.0 Use {@link #sessionStoreSupplier(StoreType, String, long)} instead.
     */ 
    public static SessionBytesStoreSupplier persistentSessionStore(final String name,
                                                                   final Duration retentionPeriod) {}
    /**
     * Create a {@link SessionBytesStoreSupplier} by the provided {@code storeType}.
     *
     * @param storeType         {@link org.apache.kafka.streams.kstream.Materialized.StoreType} of the store
     * @param storeName         name of the store (cannot be {@code null})
     * @param retentionPeriod   length ot time to retain data in the store (cannot be negative)
     *                          (note that the retention period must be at least as long enough to
     *                          contain the inactivity gap of the session and the entire grace period.)
     * @return an instance of a {@link  SessionBytesStoreSupplier}
     */
    public static SessionBytesStoreSupplier sessionStoreSupplier(final StoreType storeType,
                                                                 final String storeName,
                                                                 final long retentionPeriod) {}



    /**
     * @deprecated Since 3.2.0 Use {@link #windowStoreSupplier(StoreType, String, Duration, Duration, boolean, boolean)} instead.
     */  
    public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final String name,
                                                                            final Duration retentionPeriod,
                                                                            final Duration windowSize,
                                                                            final boolean retainDuplicates) {}

    /**
     * @deprecated Since 3.2.0 Use {@link #windowStoreSupplier(StoreType, String, Duration, Duration, boolean, boolean)} instead.
     */
    public static WindowBytesStoreSupplier persistentWindowStore(final String name,
                                                                 final Duration retentionPeriod,
                                                                 final Duration windowSize,
                                                                 final boolean retainDuplicates) {}

    /**
     * @deprecated Since 3.2.0 Use {@link #windowStoreSupplier(StoreType, String, Duration, Duration, boolean, boolean)} instead.
     */
    public static WindowBytesStoreSupplier inMemoryWindowStore(final String name,
                                                               final Duration retentionPeriod,
                                                               final Duration windowSize,
                                                               final boolean retainDuplicates)

	/**
     * Create a persistent {@link WindowBytesStoreSupplier} by the provided {@code storeType}.
     *
     * @param storeType             {@link org.apache.kafka.streams.kstream.Materialized.StoreType} of the store
     * @param storeName             name of the store (cannot be {@code null})
     * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
     *                              (note that the retention period must be at least long enough to contain the
     *                              windowed data's entire life cycle, from window-start through window-end,
     *                              and for the entire grace period)
     * @param windowSize            size of the windows (cannot be negative)
     * @param retainDuplicates      whether or not to retain duplicates. Turning this on will automatically disable
     *                              caching and means that null values will be ignored.      
     * @param timestampedStore      whether or not to create a timestamped store. Only applied to rocksDB store type
     * @return an instance of {@link WindowBytesStoreSupplier}
     * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
     * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
     */
    public static WindowBytesStoreSupplier windowStoreSupplier(final Materialized.StoreType storeType,
                                                               final String storeName,
                                                               final long retentionPeriod,
                                                               final long windowSize,
                                                               final boolean retainDuplicates,
                                                               final boolean timestampedStore) {}

    /**
     * @deprecated Since 3.2.0 Use {@link #keyValueStoreSupplier(StoreType, String, boolean)} instead.
     */
    public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) {}

    /**
     * @deprecated Since 3.2.0 Use {@link #keyValueStoreSupplier(StoreType, String, boolean)} instead.
     */
    public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(final String name) {}

    /**
     * @deprecated Since 3.2.0 Use {@link #keyValueStoreSupplier(StoreType, String, boolean)} instead.
     */
    public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(final String name) {}

	/**
     * Create a persistent {@link KeyValueBytesStoreSupplier} by the provided {@code storeType}.
     *
     * @param storeType             {@link org.apache.kafka.streams.kstream.Materialized.StoreType} of the store
     * @param storeName             name of the store (cannot be {@code null})
     * @param timestampedStore      whether or not to create a timestamped store. Only applied to rocksDB store type       * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
     * to build a persistent key-(timestamp/value) store
     */
    public static KeyValueBytesStoreSupplier keyValueStoreSupplier(final Materialized.StoreType storeType,
                                                                   final String storeName,
                                                                   final boolean timestampedStore) {}


In order to leverage the new configuration, users will need to call the new proposed API in KAFKA-13281, to provide topology-config while construct the StreamBuilder . Currently, the internal implementation for topology-config is all set. In this KIP, we'll publish the API to allow pass the topology-config into StreamBuilder .

...