You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state"Under Discussion"

Discussion thread: [DISCUSS] KIP-591: Add Kafka Streams config to set default store type

JIRA: Unable to render Jira issues macro, execution error.

Released: 2.6 (target)

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Streams supports persistent RocksDB stores as well as in-memory stores out of the box. By default all DSL operators use persistent RocksDB stores. Currently, it is only possible to switch out RocksDB stores with in-memory store on a per operator basis what is tedious if all stores should be switched (for example in a Kubernetes deployment without local disks). Furthermore, the current store interface via Materialized is very generic as it allows to pass in any custom state store implementation with the side effect that a user needs to name the store and thus also makes it queryable via "Interactive Queries".

We propose to simplify switching the store type via a new config to Kafka Streams that allows to set the default store type for the whole program as well as an improved API for Materialized.

Public Interfaces

We propose to add a new configuration parameter default.dsl.store.type that defines the default store type used by the DSL accepting two values "rocksDB"  (default) and "in_memory" .

public class StreamsConfig {
    public static final String DEFAULT_DSL_STORE_TYPE_CONFIG = "default.dsl.store.type";
    private static final String DEFAULT_DSL_STORE_TYPE_DOC = "The default store type used by DSL operators.";

    public static final ROCKS_DB = "rocksDB";
    public static final IN_MEMORY = "in_memory";
    
    .define(DEFAULT_DSL_STORE_TYPE_CONFIG,
            Type.STRING,
            ROCKS_DB,
            in(ROCKS_DB, IN_MEMORY),
            Importance.LOW,
            DEFAULT_DSL_STORE_TYPE_DOC)
}

In addition, we propose to extend Materialized config object with a corresponding option to specify the store type:

public class Materialized<K, V, S extends StateStore> {

  public enum StoreType {
    ROCKS_DB,
    IN_MEMORY
  }

  public static <K, V, S extends StateStore> Materialized<K, V, S> as(StoryType storeType);

  public Materialized<K, V, S> withStoreType(StoryType storeType);
}

Finally, we propose to deprecate the StreamsBuilder#build() method.

public class StreamsBuilder {
  @Deprecate
  public synchronized Topology build();
}

Proposed Changes

In order to leverage the new configuration, users will need to call StreamsBuilder#build(Properties) instead of StreamsBuilder#build(). This is an already established pattern that users need to follow to enable topology optimization. As we predict that we might need to access the config in the DSL more often in the future, we also propose to deprecate StreamsBuilder#build() method to lift the mental burden for users to know which of both methods that should use.

All stateful operators (like aggregate()reduce(),  count(), or join(), will use either the RocksDB or in-memory store depending on the configuration. Of course, if the store is overwritten for an individual operator via Materialized the operator overwrite will be used. The improved Materialized interface allows to switch the store type more easily without the need to specify a store name.

KStream stream = ...

// current code to change from RocksDB to in-memory
stream.groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("some-name"));

// new code
stream.groupByKey().count(Materialized.as(StoreType.IN_MEMORY));

Compatibility, Deprecation, and Migration Plan

Because the default value of the config is "rocksDB"  there is no behavior change and thus there are not backward compatibility concerns. The deprecates method StreamsBuilder#build() can be removed in the next major release 3.0.0.

Test Plan

Regular unit and integration testing is sufficient.

Rejected Alternatives

None.

  • No labels