Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state"Under Discussion"

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Streams supports persistent RocksDB stores as well as in-memory stores out of the box. By default all DSL operators use persistent RocksDB stores. Currently, it is only possible to switch out RocksDB stores with in-memory store on a per operator basis what is tedious if all stores should be switched (for example in a Kubernetes deployment without local disks). Furthermore, the current store interface via Materialized is very generic as it allows to pass in any custom state store implementation with the side effect that a user needs to name the store and thus also makes it queryable via "Interactive Queries".

We propose to simplify switching the store type via a new config to Kafka Streams that allows to set the default built-in or custom store implementation for the whole program as well as an improved API for Materialized.

Public Interfaces

We propose to add a new configuration parameter default.store.impl.class  that defines the default store implementation class used by the DSL. Default to RocksDBPersistentStoreImplementation.class 

...

Code Block
languagejava
public class StreamsBuilder {
    public StreamsBuilder() {}

	/**
     * Create a {@code StreamsBuilder} instance.
	 *
	 * @param topologyConfigs    the streams configs that apply at the topology level. Please refer to {@link TopologyConfig} for more detail
     */
	public StreamsBuilder(final TopologyConfig topologyConfigs) {} // new added


Proposed Changes

To create a topology with a non-default store implementation, user can declare it via new StreamBuilder(topologyConfig)  like this:

...

Code Block
KStream stream = ...

// current code to change from RocksDB to in-memory
stream.groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("some-name"));

// new code
stream.groupByKey().count(Materialized.as(new InMemoryStoreImplementation()));

Compatibility, Deprecation, and Migration Plan

Because the default value of the config is "RocksDBPersistentStoreImplementation"  there is no behavior change and thus there are not backward compatibility concerns.

Test Plan

Regular unit and integration testing is sufficient.

Rejected Alternatives

1. Use enum  toset the default store type

...