Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9847

Released:2.6 (target) 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

We propose to simplify switching the store type via a new config to Kafka Streams that allows to set the default built-in or custom store type implementation for the whole program as well as an improved API for for Materialized.

Public Interfaces

We propose to add a new configuration parameter default.store.impl.class  that defines the default store implementation class used by the DSL. Default to RocksDBStoreImplementationRocksDBPersistentStoreImplementation.class 

Code Block
public class StreamsConfig {     
    public static final String DEFAULT_STORE_IMPLEMENTATION_CLASS_CONFIG = "default.store.impl.class";
    private static final String DEFAULT_STORE_IMPLEMENTATION_CLASS_DOC = 
          "Store supplier implementation class to use. Default is set as RocksDBStoreImplementationRocksDBPersistentStoreImplementation. It can be overwritten dynamically during streaming operation.";                


	.define(DEFAULT_STORE_IMPLEMENTATION_CLASS_CONFIG,
            Type.CLASS,
            RocksDBStoreImplementationRocksDBPersistentStoreImplementation.class.getName(),
            Importance.MEDIUM,
            DEFAULT_STORE_IMPLEMENTATION_CLASS_DOC) 
}


The StoreImplementation  interface:

Code Block
languagejava
	/**
 * A state  * Materialize astore supplier Implementation interface for all types of {@link StateStore}.
 with the store implementation.
     *
 */
public interface StoreImplementation {
    /**
     * @paramCreate storeImplementationa  store implementation used to materialize the store{@link KeyValueBytesStoreSupplier}.
     *
     * alphanumerics, '.', '_' and '-'.
     * @param <K>@param name  name of the store (cannot be {@code null})
     * @return an key typeinstance of thea store
{@link KeyValueBytesStoreSupplier} that can be *used
 @param <V>   * to build a persistent key-value type of the store
     */
 @param <S>  KeyValueBytesStoreSupplier keyValueSupplier(String name);

   type of the {@link StateStore} /**
     * @returnCreate a new {@link MaterializedWindowBytesStoreSupplier}.
 instance with the given storeName*
     */
 @param name     public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StoreImplementation storeImplementation)
Code Block
languagejava
/**
 * A state name of the store supplier(cannot Implementationbe interface for all types of {@link StateStore}.
 *
 */
public interface StoreImplementation {
    /**
     * Create a {@link KeyValueBytesStoreSupplier}.
     *{@code null})
     * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
     * @param name  name of the store (cannot be {@code null})
     * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
   (note that *the toretention buildperiod amust persistentbe key-value store
     */at least long enough to contain the
    KeyValueBytesStoreSupplier keyValueSupplier(String name);

*     /**
     * Create a {@link WindowBytesStoreSupplier}.
     *
     * @param name    windowed data's entire life cycle, from  window-start through window-end,
     name*     of the store (cannot be {@code null})
     * @param retentionPeriod       length of time to retain dataand infor the storeentire (cannot be negativegrace period)
     * @param windowSize            size of the windows (cannot be negative)
     * @param retainDuplicates    (note that thewhether retentionor periodnot mustto beretain atduplicates. leastTurning longthis enoughon towill containautomatically thedisable
     *                              windowedcaching data'sand entiremeans lifethat cycle,null fromvalues window-startwill through window-end,be ignored.
     * @return an instance of {@link WindowBytesStoreSupplier}
     * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
    and for* the@throws entireIllegalArgumentException grace period)
     * @param windowSize if {@code retentionPeriod} is smaller than {@code windowSize}
	 */
    WindowBytesStoreSupplier windowBytesStoreSupplier(String name,
     size of the windows (cannot be negative)
     * @param retainDuplicates      whether or not to retain duplicates. Turning this on will automatically disable
     *              Duration retentionPeriod,
               caching and means that null values will be ignored.
     * @return an instance of  {@link WindowBytesStoreSupplier}
     * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code longDuration milliseconds}windowSize,
      * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
	 */
    WindowBytesStoreSupplier windowBytesStoreSupplier(String name,
                               boolean retainDuplicates);


    /**
     * Create a {@link SessionBytesStoreSupplier}.
     * 
     Duration* retentionPeriod,
@param name              name of the store (cannot be {@code null})
     * @param retentionPeriod   length of time to retain data in the store (cannot be negative)
     *      Duration windowSize,
                   (note that the retention period must be at least as long enough to
     *                  boolean retainDuplicates);


    /**
   contain the *inactivity Creategap aof {@link SessionBytesStoreSupplier}.
     * 
     * @param name        the session and the entire grace period.)
     * @return an instance of a {@link  SessionBytesStoreSupplier}
     */
 name of the storeSessionBytesStoreSupplier sessionBytesStoreSupplier(cannotString bename,
 {@code null})
     * @param retentionPeriod   length of time to retain data in the store (cannot be negative)
     *                           (note that the retention period must be at least as long enough to
     *              Duration retentionPeriod);
}


For the StoreImplementation, Kakfa provides 3 built-in kinds of implementation:

RocksDBPersistentStoreImplementation.java (default)

Code Block
languagejava
/**
 * A Rocks DB persistent state store supplier Implementation.
 *
 */
public class RocksDBPersistentStoreImplementation implements StoreImplementation {
    @Override
    public KeyValueBytesStoreSupplier keyValueSupplier(final String name) {
        return Stores.persistentTimestampedKeyValueStore(name);
   contain the}

 inactivity gap of the@Override
 session and the entirepublic grace period.)
WindowBytesStoreSupplier windowBytesStoreSupplier(final String name,
                 * @return an instance of a {@link  SessionBytesStoreSupplier}
     */
    SessionBytesStoreSupplier sessionBytesStoreSupplier(String name,
                         final Duration retentionPeriod,
                             Duration retentionPeriod);
}

Proposed Changes

                                final Duration windowSize,
                                                             final boolean retainDuplicates) {
        return Stores.persistentTimestampedWindowStore(name, retentionPeriod, windowSize, retainDuplicates);
    }

    @Override
    public SessionBytesStoreSupplier sessionBytesStoreSupplier(final String name, final Duration retentionPeriod) {
        return Stores.persistentSessionStore(name, retentionPeriod);
    }
}


RocksDBStoreImplementation.java

Code Block
languagejava
/**
 * A Rocks DB state store supplier Implementation.
 *
 */
public class RocksDBStoreImplementation implements StoreImplementation {
    @Override
    public KeyValueBytesStoreSupplier keyValueSupplier(final String name) {
        return Stores.persistentKeyValueStore(name);
    }

    @Override
    public WindowBytesStoreSupplier windowBytesStoreSupplier(final String name,
                                                             final Duration retentionPeriod,
                                                             final Duration windowSize,
                                                             final boolean retainDuplicates) {
        return Stores.persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates);
    }

    @Override
    public SessionBytesStoreSupplier sessionBytesStoreSupplier(final String name, final Duration retentionPeriod) {
        return Stores.persistentSessionStore(name, retentionPeriod);
    }
}


InMemoryStoreImplementation.java

Code Block
languagejava
/**
 * A In-memory state store supplier Implementation.
 *
 */
public class InMemoryStoreImplementation implements StoreImplementation {
    @Override
    public KeyValueBytesStoreSupplier keyValueSupplier(final String name) {
        return Stores.inMemoryKeyValueStore(name);
    }

    @Override
    public WindowBytesStoreSupplier windowBytesStoreSupplier(final String name,
                                                             final Duration retentionPeriod,
                                                             final Duration windowSize,
                                                             final boolean retainDuplicates) {
        return Stores.inMemoryWindowStore(name, retentionPeriod, windowSize, retainDuplicates);
    }

    @Override
    public SessionBytesStoreSupplier sessionBytesStoreSupplier(final String name, final Duration retentionPeriod) {
        return Stores.inMemorySessionStore(name, retentionPeriod);
    }
}


In Materialized class, we provided one more API to allow users to provide StoreImplementation  dynamically.

Code Block
languagejava
	/**
     * Materialize a {@link StateStore} with the store implementation.
     *
     * @param storeImplementation  store implementation used to materialize the store
     * @param <K>       key type of the store
     * @param <V>       value type of the store
     * @param <S>       type of the {@link StateStore}

     * @return a new {@link Materialized} instance with the given storeImplementation
     */
    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StoreImplementation storeImplementation)


Proposed Changes

In order to leverage the new configuration, users will need to call the new proposed API in KAFKA-13281, to provide topology-config while construct the StreamBuilder .

Code Block
languagejava
Properties props = new Properties();
...
props.put(StreamsConfig.DEFAULT_STORE_IMPLEMENTATION_CLASS_CONFIG, InMemoryStoreImplementation.class.getName());
StreamsConfig streamsConfig = new StreamsConfig(props);

builder = new StreamsBuilder(new TopologyConfig(streamsConfig));
topology = builder.build(); 


If not set, it'll default to use RocksDBPersistentStoreImplementation , which is the same as current behavior. So this is a backward-compatible designIn order to leverage the new configuration, users will need to call StreamsBuilder#build(Properties) instead of StreamsBuilder#build(). This is an already established pattern that users need to follow to enable topology optimization. As we predict that we might need to access the config in the DSL more often in the future, we also propose to deprecate StreamsBuilder#build() method to lift the mental burden for users to know which of both methods that should use.


All stateful operators (like aggregate()reduce(),  count(), or join(), will use either the RocksDB or in-memory store depending on the state store in the "default.store.impl.class" configuration. Of course, if the store is overwritten for an individual operator via via Materialized the the operator overwrite will be used. The improved improved Materialized interface allows to switch the store type more easily without the need to specify a store name.

Code Block
KStream stream = ...

// current code to change from RocksDB to in-memory
stream.groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("some-name"));

// new code
stream.groupByKey().count(Materialized.as(StoreType.IN_MEMORYnew InMemoryStoreImplementation()));

Compatibility, Deprecation, and Migration Plan

Because the default value of the config is "rocksDBRocksDBPersistentStoreImplementation"  there is no behavior change and thus there are not backward compatibility concerns. The deprecates method StreamsBuilder#build() can be removed in the next major release 3.0.0compatibility concerns.

Test Plan

Regular unit and integration testing is sufficient.

...

In addition, we propose to extend Materialized config object with a corresponding option to specify the store type::

Code Block
public class Materialized<K, V, S extends StateStore> {

  public enum StoreType {
    ROCKS_DB,
    IN_MEMORY
  }

  public static <K
Code Block
public class Materialized<K, V, S extends StateStore> { Materialized<K, V, S> as(StoryType storeType);

  public enumMaterialized<K, StoreTypeV, {
S>    ROCKS_DB,
    IN_MEMORY
  }
withStoreType(StoryType storeType);
}

Finally, we propose to deprecate the StreamsBuilder#build() method.

Code Block
public class StreamsBuilder {
  @Deprecate
  public staticsynchronized <K, V, S extends StateStore> Materialized<K, V, S> as(StoryType storeType);

  public Materialized<K, V, S> withStoreType(StoryType storeType);
}

Finally, we propose to deprecate the StreamsBuilder#build() method.

...

Topology build();
}



In order to leverage the new configuration, users will need to call StreamsBuilder#build(Properties) instead of StreamsBuilder#build(). This is an already established pattern that users need to follow to enable topology optimization. As we predict that we might need to access the config in the DSL more often in the future, we also propose to deprecate StreamsBuilder#build() method to lift the mental burden for users to know which of both methods that should use.

...