Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleStores.java
package org.apache.kafka.streams.kstream;

public class Materialized {
	...
	public enum StoreType implements DslStorePlugin {
		// DefaultDslStorePlugin will itself be a delegate to two different implementations
		// which will be stored in static fields ROCKS_DB and IN_MEMORY, choosing which one
		// to delegate to depending on the default.dsl.store configuration
    	ROCKS_DB(DefaultDslStorePlugin.ROCKS_DB),
    	IN_MEMORY(DefaultDslStorePlugin.IN_MEMORY);
	
		private final DslStorePlugin delegateSpecdelegatePlugin;

		StoreTypeSpec(final StoreTypeSpecDslStorePlugin delegateSpecdelegate) {
			this.delegateSpecdelegatePlugin = delegateSpecdelegate;
		}

		// delegate all methods to delegateSpecdelegatePlugin
	}
	...
}

StreamJoined API

Finally, to address the three problems with configuring state stores for stream-stream joins today, we propose one last new method that will operate similarly to the new Materialized APIs but for StreamJoined:

Code Block
languagejava
titleStreamJoined.java
public class StreamJoined {

    /**
     * Creates a StreamJoined instance with the given {@link DslStorePlugin}. The store plugin
     * will be used to get all the state stores in this operation that do not otherwise have an
     * explicitly configured {@link org.apache.kafka.streams.state.DslStorePlugin}.
     *
     * @param storePlugin         the store plugin that will be used for state stores
     * @param <K>                 the key type
     * @param <V1>                this value type
     * @param <V2>                other value type
     * @return                    {@link StreamJoined} instance
     */
    public static <K, V1, V2> StreamJoined<K, V1, V2> with(final DslStorePlugin storePlugin);
    
    /**
     * Configure with the provided {@link DslStorePlugin} for all state stores that are not
     * configured with a {@link org.apache.kafka.streams.state.DslStorePlugin} already.
     * 
     * @param storePlugin  the store plugin to use for plugging in state stores
     * @return             a new {@link StreamJoined} configured with this store plugin
     */
    public StreamJoined<K, V1, V2> withStoreTypeSpecwithStorePlugin(final DslStorePlugin storePlugin);
}  

...

  • What impact (if any) will there be on existing users?

Existing users will see deprecation warnings if they are using the old default.dsl.store configuration in their code. When 4.0 is released this configuration will no longer be respected.N/A

  • If we are changing behavior how will we phase out the older behavior?

...

Existing test coverage for the default.dsl.store configuration will be migrated to use the new configuration and ensure that the default behavior can be changed.

Rejected Alternatives

Deprecate Existing Config

We can deprecate the existing config entirely, and instead provide two out of the box implementations of dsl.store.plugin.class:

  • RocksDbStorePlugin
  • InMemoryStorePlugin

This makes it a little easier for users to reason about as there is only a single config. It also would make it a bit easier to aadd new store implementations going forward without cluttering the DefaultStorePlugin class. The downside is that it is more cumbersome to configure (an enum is easy to remember and clear in the configs - as opposed to a class).The main alternative to consider here is how to handle the original default.dsl.store config. As noted it's impossible to expand this config as-is without either changing how it's defined or introducing a new config. But there are a few possible approaches with their own merits and flaws:

Add CUSTOM Enum to Existing Config

Instead of positioning the new config as a full replacement for default.dsl.store, we could instead introduce it as a complementary config for custom stores specifically. For example, add a 3rd StoreType to the enum definition, such as CUSTOM. If (and only if) the CUSTOM StoreType is used, the value of the new In the proposal above, the dsl.store.plugin.class is the first to be resolved, and then only if DefaultDslStorePlugin is used the old default.dsl.store .type.spec config will be looked at and used to obtain the actual StoreSupplier instances of this custom typeconfig is used. We could flip the semantics, and require users to specify CUSTOM as the default.dsl.store and only then check the new configuration. This option is slightly more cumbersome over the proposed design, and doesn't have any clear advantages.

...