...
Code Block | ||||
---|---|---|---|---|
| ||||
public interface DslStorePluginDslStoreSuppliers { KeyValueBytesStoreSupplier keyValueStore(final KeyValuePluginParamsDslKeyValueParams params); WindowBytesStoreSupplier windowStore(final WindowPluginParamsDslWindowParams params); SessionBytesStoreSupplier sessionStore(final SessionPluginParamsDslSessionParams params); } // the below are all "struct"-like classes with the following fields class KeyValuePluginParamsDslKeyValueParams(String name); class WindowPluginParamsDslWindowParams(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates, EmitStrategy emitStrategy); class SessionPluginParamsDslSessionParams(String name, Duration retentionPeriod, EmitStrategy emitStrategy); |
...
We will provide a default implementations of the above interface in a class orginterfaces:
- org.apache.kafka.streams.
...
- RocksDbDslStoreSuppliers
- org.apache.kafka.streams.InMemoryDslStoreSuppliers
Configuration
These interfaces will be specified
Configuration
...
by means of a new config, defined below. This takes precedence over the old default.dsl.store config, which
...
will be deprecated to provide clear signal on the preferred method. Further discussion on this point is under "Rejected Alternatives":
Code Block |
---|
public static final String DSL_STORE_PLUGINSUPPLIERS_CLASS_CONFIG = "dsl.store.pluginsuppliers.class"; public static final String DSL_STORE_PLUGINSUPPLIERS_CLASS_DOC = "Defines which store implementations to plug in to DSL operators. Must implement the <code>org.apache.kafka.streams.state.DslStorePlugin<DslStoreSuppliers</code> interface."; public static final Class DSL_STORE_SUPPLIERS_CLASS_DEFAULT = org.apache.kafka.streams.RocksDbDslStoreSuppliers.class; |
Example usage of this configuration:
Code Block |
---|
dsl.store.pluginsuppliers.class = com.my.company.MyCustomStoreTypeSpec |
If using anything other than dsl.store.plugin.class = org.apahce.kafka.streams.DefaultDslStorePlugin
the default.dsl.store
configuration will be ignored.
MyCustomStoreSuppliers |
Materialized API
In the existing code, users are able to override the default store type by leveraging the existing Materialized.as
and Materialized.withStoreType
methods. We will change the signature of these methods (see below on how we maintain compatibility) to take in a DslStorePlugin
DslStoreSuppliers
instead of the StoreType
enum:
Code Block | ||||
---|---|---|---|---|
| ||||
public class Materialized { public static <K, V, S extends StateStore> Materialized<K, V, S> as(final DslStorePluginDslStoreSuppliers storePluginstoreSuppliers); public Materialized<K, V, S> withStoreType(final DslStorePluginDslStoreSuppliers storePluginstoreSuppliers); } |
In order to ensure code compatibility and smooth upgrade paths, we will leave the old enums in place and have them implement DslStorePluginDslStoreSuppliers:
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams.kstream; public class Materialized { ... public enum StoreType implements DslStorePluginDslStoreSuppliers { // DefaultDslStorePluginDefaultDslStoreSuppliers will itself be a delegate to two different implementations // which will be stored in static fields ROCKS_DB and IN_MEMORY, choosing which one // to delegate to depending on the default.dsl.store configuration ROCKS_DB(DefaultDslStorePluginDefaultDslStoreSuppliers.ROCKS_DB), IN_MEMORY(DefaultDslStorePluginDefaultDslStoreSuppliers.IN_MEMORY); private final DslStorePluginDslStoreSuppliers delegatePlugindelegate; StoreTypeSpec(final DslStorePluginDslStoreSuppliers delegate) { this.delegatePlugindelegate = delegate; } // delegate all methods to delegatePlugindelegate } ... } |
StreamJoined API
Finally, to address the three problems with configuring state stores for stream-stream joins today, we propose one last new method that will operate similarly to the new Materialized APIs but for StreamJoined:
Code Block | ||||
---|---|---|---|---|
| ||||
public class StreamJoined { /** * Creates a StreamJoined instance with the given {@link DslStorePluginDslStoreSuppliers}. The store plugin * will be used to get all the state stores in this operation that do not otherwise have an * explicitly configured {@link org.apache.kafka.streams.state.DslStorePluginDslStoreSuppliers}. * * @param storePluginstoreSuppliers the store plugin that will be used for state stores * @param <K> the key type * @param <V1> this value type * @param <V2> other value type * @return {@link StreamJoined} instance */ public static <K, V1, V2> StreamJoined<K, V1, V2> with(final DslStorePluginDslStoreSuppliers storePluginstoreSuppliers); /** * Configure with the provided {@link DslStorePluginDslStoreSuppliers} for all state stores that are not * configured with a {@link org.apache.kafka.streams.state.DslStorePluginDslStoreSuppliers} already. * * @param storePluginstoreSuppliers the store plugin to use for plugging in state stores * @return a new {@link StreamJoined} configured with this store plugin */ public StreamJoined<K, V1, V2> withStorePluginwithstoreSuppliers(final DslStorePluginDslStoreSuppliers storePluginstoreSuppliers); } |
Proposed Changes
There will be no functional changes as part of this KIP. Implementations for all of the above interfaces will be provided. Note the following discussion points:
- Terminology: "Store Type" vs "Store Implementation". A store "type" is a top level DSL store type - there are currently only three: KV, Windowed and Session. A "store implementation" is the chosen implementation of the specified store type. Kafka Streams provides RocksDB and in-memory store implementations OOTB.
- Versioned Tables, like timestamped tables, are an implementation detail of the chosen plugin when providing a store type (usually KV stores). In the future, the DefaultDslStorePlugin DefaultDslStoreSuppliers is free to switch its default implementation of the KV store to return a versioned implementation instead. This KIP does not propose promoting Versioned tables to the top level API here.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
N/A
...
Existing users will see deprecation warnings if they are using the old default.dsl.store configuration in their code. When 4.0 is released this configuration will no longer be respected.
- If we are changing behavior how will we phase out the older behavior?
N/A
- If we need special migration tools, describe them here.
N/A
- When will we remove the existing behavior?
...
4.0
Test Plan
Existing test coverage for the default.dsl.store configuration will be migrated to use the new configuration and ensure that the default behavior can be changed.
Rejected Alternatives
Deprecate Existing Config
We can deprecate the existing config entirely, and instead provide two out of the box implementations of dsl.store.plugin.class:
- RocksDbStorePlugin
- InMemoryStorePlugin
This makes it a little easier for users to reason about as there is only a single config. It also would make it a bit easier to aadd new store implementations going forward without cluttering the DefaultStorePlugin class. The downside is that it is more cumbersome to configure (an enum is easy to remember and clear in the configs - as opposed to a class). The other downside is that there will be a deprecation period where both are active and if users ignore those deprecations they will have compile time errors when we remove the original config.
Add CUSTOM Enum to Existing Config
...
Support Both Configs
Don't deprecate the old default.dsl.store config and instead maintain it alongside the new config. The advantage here is obviously that we minimize disruption to anyone using this config already. However I believe the existing userbase is likely to be quite small, as in its current form this feature is only useful to those who (a) have many state stores to the point overriding them separately is cumbersome, and (b) also wishes to override all (or most) of their stores to be in-memory specifically. This seems to be a relatively uncommon pattern already, not least of which being that it's a recipe for OOM disaster unless their applications are extremely small and/or low traffic. In the absence of any evidence that this config is already in use despite such a niche application, it seems best to take this opportunity to deprecate it now, before it gains any more traction, and have everyone converge on the same, single configuration going forward. Whenever there are two configs affecting the same underlying feature, it's inevitable there will be confusion over their interaction/hierarchy which can often lead to misconfiguration. Deprecating the old config is a clear sign to users which of the two should be preferred and will take precedence.
Add CUSTOM Enum to Existing Config
Instead of positioning the new config as a full replacement for default.dsl.store, we could instead introduce it as a complementary config for custom stores specifically. For example, add a 3rd StoreType to the enum definition, such as CUSTOM. If (and only if) the CUSTOM StoreType is used, the value of the new default.dsl.store.type.spec config will be looked at and used to obtain the actual StoreSupplier instances of this custom type. This option has the same advantages as the above, though I would qualify it in the same way. In general, having two configs that do the same thing or are tied to the same feature will be confusing to new (or old!) users. However, between these two alternatives I would personally advocate for this one as the better option, as it at least solves the concern over potential misconfiguration due to unclear interaction between the two configs.
Deprecating StoreType Enum, Materialized.as(StoreType) and Materialized.withStoreType(StoreType) APIs
...
Code Block |
---|
public class Materialized { // new methods public static <K, V, S extends StateStore> Materialized<K, V, S> as(final DslStorePluginDslStoreSuppliers storePluginstoreSuppliers); public Materialized<K, V, S> withStorePluginwithStoreSuppliers(final DslStorePluginDslStoreSuppliers storePluginstorestoreSuppliers); // deprecate old methods @Deprecated public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StoreType storeType); @Deprecated public Materialized<K, V, S> withStoreType(final StoreType storeType); } |
...