Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Accepted

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread] 

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-15215

...

Code Block
languagejava
titleDSLStoreProvider.java
public interface StoreTypeSpec {

    KeyValueBytesStoreSupplier keyValueStore(final String name);

    WindowBytesStoreSupplier windowStore(final String name, final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates);

    SessionBytesStoreSupplier sessionStore(final String name, final Duration retentionPeriod);
}  
Info

Note on Evolving API: a concern raised on KIP-591 about having such an interface is that the increased API surface area would mean introducing new store types (beyond KVS/Window/Session) would cause custom state store implementations to throw compiletime errors. I think it is valid to have new store types have a default implementation that throws new UnsupportedOperationException() as it is unlikely that users that specify a custom state store as the default will want a different (e.g. ROCKS_DB) store created without them knowing.

OOTB Store Type Specs

We will provide default implementations of the above interface in the following classes:

/**
 * {@code DslStoreSuppliers} defines a grouping of factories to construct
 * stores for each of the types of state store implementations in Kafka
 * Streams. This allows configuration of a default store supplier beyond
 * the builtin defaults of RocksDB and In-Memory.
 *
 * <p>There are various ways that this configuration can be supplied to
 * the application (in order of precedence):
 * <ol>
 *     <li>Passed in directly to a DSL operator via either
 *     {@link org.apache.kafka.streams.kstream.Materialized#as(DslStoreSuppliers)},
 *     {@link org.apache.kafka.streams.kstream.Materialized#withStoreType(DslStoreSuppliers)}, or
 *     {@link org.apache.kafka.streams.

...

Configuration

These interfaces will be specified by means of a new config, defined below. This takes precedence over the old default.dsl.store config, which will be deprecated to provide clear signal on the preferred method. Further discussion on this point is under "Rejected Alternatives":

Code Block
public static final String DEFAULT_DSL_STORE_TYPE_SPEC_CONFIG = "default.dsl.store.type.spec";
public static final String DEFAULT_DSL_STORE_TYPE_SPEC_DOC = "Default state store specification used by DSL operators. Must implement the <code>org.apache.kafka.streams.state.StoreTypeSpec</code> interface.";

@Deprecated
public static final String DEFAULT_DSL_STORE_CONFIG = "default.dsl.store";

Example usage of this configuration:

Code Block
// old configuration examples
default.dsl.store = ROCKS_DB
default.dsl.store = IN_MEMORY

// new configuration examples
default.dsl.store.type.spec = org.apache.kafka.streams.RocksDbStoreTypeSpec
default.dsl.store.type.spec = org.apache.kafka.streams.InMemoryStoreTypeSpec

// not possible before
default.dsl.store.type.spec = com.my.company.MyCustomStoreTypeSpec

Materialized API

In the existing code, users are able to override the default store type by leveraging the existing Materialized.as and Materialized.withStoreType  methods. We will change the signature of these methods (see below on how we maintain compatibility) to take in a StoreTypeSpec  instead of the StoreType  enum:

Code Block
languagejava
titleMaterialized.java
public class Materialized {

    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StoreTypeSpec typeSpec);

    public Materialized<K, V, S> withStoreType(final StoreTypeSpec typeSpec);
}

In order to ensure code compatibility and smooth upgrade paths, we will leave the old enums in place and have them implement StoreTypeSpec:

Code Block
languagejava
titleStores.java
package org.apache.kafka.streams.kstream;

public class Materialized {
	...
	public enum StoreType implements StoreTypeSpec {
    	ROCKS_DB(new RocksDbStoreTypeSpec()),
    	IN_MEMORY(new InMemoryStoreTypeSpec());
	
		private final StoreTypeSpec delegateSpec;

		StoreTypeSpec(final StoreTypeSpec delegateSpec) {
			this.delegateSpec = delegateSpec;
		}

		// delegate all methods to delegateSpec
	}
	...
}

StreamJoined API

Finally, to address the three problems with configuring state stores for stream-stream joins today, we propose one last new method that will operate similarly to the new Materialized APIs but for StreamJoined:

Code Block
languagejava
titleStreamJoined.java
public class StreamJoined {

    /**
     * Creates a StreamJoined instance with the given {@link StoreTypeSpec}. The store provider
     * will be used to get all the state stores in this operation that do not otherwise have an
     * explicitly configured {@link org.apache.kafka.streams.state.StoreSupplier}.
     *
     * @param storeTypeSpec       the store type specification that will be used for all unconfigured state stores
     * @param <K>                 the key type
     * @param <V1>                this value type
     * @param <V2>                other value type
     * @return                    {@link StreamJoined} instance
     */
    public static <K, V1, V2> StreamJoined<K, V1, V2> with(final StoreTypeSpec storeTypeSpec);
    
    /**
     * Configure with the provided {@link StoreTypeSpec} for all state stores that are not
     * configured with a {@link org.apache.kafka.streams.state.StoreSupplier} already.
     * 
     * @param storeProvider  the store provider to use for all unconfigured state stores
     * @return            a new {@link StreamJoined} configured with this store provider
     */
    public StreamJoined<K, V1, V2> withStoreProvider(final StoreTypeSpec storeProvider);
}  

Proposed Changes

...

kstream.StreamJoined#withDslStoreSuppliers(DslStoreSuppliers)}</li>
 *
 *     <li>Passed in via a Topology configuration override (configured in a
 *     {@link org.apache.kafka.streams.TopologyConfig} and passed into the
 *     {@link org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig)} constructor</li>
 *
 *     <li>Configured as a global default in {@link org.apache.kafka.streams.StreamsConfig} using
 *     the {@link org.apache.kafka.streams.StreamsConfig#DSL_STORE_SUPPLIERS_CLASS_CONFIG}</li>
 *     configuration.
 * </ol>
 *
 * <p>Kafka Streams is packaged with some pre-existing {@code DslStoreSuppliers}
 * that exist in {@link BuiltInDslStoreSuppliers}
 */
public interface DslStoreSuppliers extends Configurable {

    KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params);

    WindowBytesStoreSupplier windowStore(final DslWindowParams params);

    SessionBytesStoreSupplier sessionStore(final DslSessionParams params);
}

// the below are all "struct"-like classes with the following fields
class DslKeyValueParams(String name);
class DslWindowParams(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates, EmitStrategy emitStrategy, boolean isSlidingWindow, boolean isTimestamped);
class DslSessionParams(String name, Duration retentionPeriod, EmitStrategy emitStrategy);


Info

Note on Evolving API: a concern raised on KIP-591 about having such an interface is that the increased API surface area would mean introducing new store implementations would cause custom state store implementations to throw compile time errors. Introducing the *Params classes will prevent such issues unless an entirely new store type is added.

If an entirely new state store type (beyond KV/Windowed/Session) is added - I think it is valid to have new store types have a default implementation that throws new UnsupportedOperationException() as it is unlikely that users that specify a custom state store as the default will want a different (e.g. ROCKS_DB) store created without them knowing. This also seems unlikely since these three have been there for many years and they've been the only three for that duration.

OOTB Store Type Specs

We will provide a default implementations of the above interfaces:

  • org.apache.kafka.streams.RocksDbDslStoreSuppliers
  • org.apache.kafka.streams.InMemoryDslStoreSuppliers

Configuration

These interfaces will be specified by means of a new config, defined below. This takes precedence over the old default.dsl.store config, which will be deprecated to provide clear signal on the preferred method. Further discussion on this point is under "Rejected Alternatives":

Code Block
public static final String DSL_STORE_SUPPLIERS_CLASS_CONFIG = "dsl.store.suppliers.class";
public static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store implementations to plug in to DSL operators. Must implement the <code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface.";
public static final Class DSL_STORE_SUPPLIERS_CLASS_DEFAULT = org.apache.kafka.streams.RocksDbDslStoreSuppliers.class;

// we also add this constant for the old configuration so that it is easily referenceable
// across the code base
public static final String DEFAULT_DSL_STORE = ROCKS_DB;

Example usage of this configuration:

Code Block
dsl.store.suppliers.class = com.my.company.MyCustomStoreSuppliers


Info
titleBehavior Change for default.dsl.store

The new dsl.store.suppliers.class will respect the configurations when passed in via KafkaStreams#new(Topology, StreamsConfig)  (and other related constructors) instead of only being respected when passed in to the initial StoreBuilder#new(TopologyConfig), which is what the old default.dsl.store  configuration required.

TopologyConfig

We will expose the following method from TopologyConfig :

Code Block
    /**
     * @return the DslStoreSuppliers if the value was explicitly configured (either by
     *         {@link StreamsConfig#DEFAULT_DSL_STORE} or {@link StreamsConfig#DSL_STORE_SUPPLIERS_CLASS_CONFIG})
     */
    public Optional<DslStoreSuppliers> resolveDslStoreSuppliers() {
        if (isTopologyOverride(DSL_STORE_SUPPLIERS_CLASS_CONFIG, topologyOverrides) || globalAppConfigs.originals().containsKey(DSL_STORE_SUPPLIERS_CLASS_CONFIG)) {
            return Optional.of(Utils.newInstance(dslStoreSuppliers, DslStoreSuppliers.class));
        } else if (isTopologyOverride(DEFAULT_DSL_STORE_CONFIG, topologyOverrides) || globalAppConfigs.originals().containsKey(DEFAULT_DSL_STORE_CONFIG)) {
            return Optional.of(MaterializedInternal.parse(storeType));
        } else {
            return Optional.empty();
        }

This allows us to determine whether or not a DslStoreSuppliers was configured or not, and also respects the hierarchy of resolution described in the javadocs to DslStoreSuppliers.

Materialized API

In the existing code, users are able to override the default store type by leveraging the existing Materialized.as and Materialized.withStoreType  methods. We will change the signature of these methods (see below on how we maintain compatibility) to take in a DslStoreSuppliers  instead of the StoreType  enum:

Code Block
languagejava
titleMaterialized.java
public class Materialized {

    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final DslStoreSuppliers storeSuppliers);

    public Materialized<K, V, S> withStoreType(final DslStoreSuppliers storeSuppliers);
}

In order to ensure code compatibility and smooth upgrade paths, we will leave the old enums in place and have them implement DslStoreSuppliers:

Code Block
languagejava
titleStores.java
package org.apache.kafka.streams.kstream;

public class Materialized {
	...
	public enum StoreType implements DslStoreSuppliers {
		// DefaultDslStoreSuppliers will itself be a delegate to two different implementations
		// which will be stored in static fields ROCKS_DB and IN_MEMORY, choosing which one
		// to delegate to depending on the default.dsl.store configuration
    	ROCKS_DB(DefaultDslStoreSuppliers.ROCKS_DB),
    	IN_MEMORY(DefaultDslStoreSuppliers.IN_MEMORY);
	
		private final DslStoreSuppliers delegate;

		StoreTypeSpec(final DslStoreSuppliers delegate) {
			this.delegate = delegate;
		}

		// delegate all methods to delegate
	}
	...
}

StreamJoined API

Finally, to address the three problems with configuring state stores for stream-stream joins today, we propose one last new method that will operate similarly to the new Materialized APIs but for StreamJoined:

Code Block
languagejava
titleStreamJoined.java
public class StreamJoined {

    /**
     * Creates a StreamJoined instance with the given {@link DslStoreSuppliers}. The store plugin
     * will be used to get all the state stores in this operation that do not otherwise have an
     * explicitly configured {@link org.apache.kafka.streams.state.DslStoreSuppliers}.
     *
     * @param storeSuppliers         the store plugin that will be used for state stores
     * @param <K>                 the key type
     * @param <V1>                this value type
     * @param <V2>                other value type
     * @return                    {@link StreamJoined} instance
     */
    public static <K, V1, V2> StreamJoined<K, V1, V2> with(final DslStoreSuppliers storeSuppliers);
    
    /**
     * Configure with the provided {@link DslStoreSuppliers} for all state stores that are not
     * configured with a {@link org.apache.kafka.streams.state.DslStoreSuppliers} already.
     * 
     * @param storeSuppliers  the store plugin to use for plugging in state stores
     * @return             a new {@link StreamJoined} configured with this store plugin
     */
    public StreamJoined<K, V1, V2> withstoreSuppliers(final DslStoreSuppliers storeSuppliers);
}  

Proposed Changes

There will be no functional changes as part of this KIP. Implementations for all of the above interfaces will be provided. Note the following discussion points:

  • Terminology: "Store Type" vs "Store Implementation". A store "type" is a top level DSL store type - there are currently only three: KV, Windowed and Session. A "store implementation" is the chosen implementation of the specified store type. Kafka Streams provides RocksDB and in-memory store implementations OOTB.
  • Versioned Tables, like timestamped tables, are an implementation detail of the chosen plugin when providing a store type (usually KV stores). In the future, the DefaultDslStoreSuppliers is free to switch its default implementation of the KV store to return a versioned implementation instead. This KIP does not propose promoting Versioned tables to the top level API here.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

Existing users will see deprecation warnings if they are using the old default.dsl.store configuration in their code. When 4.0 is released this configuration will no longer be respected.

  • If we are changing behavior how will we phase out the older behavior?

N/A

  • If we need special migration tools, describe them here.

N/A

  • When will we remove the existing behavior?

4.0

Test Plan

Existing test coverage for the default.dsl.store configuration will be migrated to use the new configuration and ensure that the default behavior can be changed.

Rejected Alternatives

...

be migrated to use the new configuration and ensure that the default behavior can be changed.

Rejected Alternatives

Support Both Configs

Don't deprecate the old default.dsl.store config and instead maintain it alongside the new config. The advantage here is obviously that we minimize disruption to anyone using this config already. However I believe the existing userbase is likely to be quite small, as in its current form this feature is only useful to those who (a) have many state stores to the point overriding them separately is cumbersome, and (b) also wishes to override all (or most) of their stores to be in-memory specifically. This seems to be a relatively uncommon pattern already, not least of which being that it's a recipe for OOM disaster unless their applications are extremely small and/or low traffic. In the absence of any evidence that this config is already in use despite such a niche application, it seems best to take this opportunity to deprecate it now, before it gains any more traction, and have everyone converge on the same, single configuration going forward. Whenever there are two configs affecting the same underlying feature, it's inevitable there will be confusion over their interaction/hierarchy which can often lead to misconfiguration. Deprecating the old config is a clear sign to users which of the two should be preferred and will take precedence.

Add CUSTOM Enum to Existing Config

Instead of positioning the new config as a full replacement for default.dsl.store, we could instead introduce it as a complementary config for custom stores specifically. For example, add a 3rd StoreType to the enum definition, such as CUSTOM. If (and only if) the CUSTOM StoreType is used, the value of the new default.dsl.store.type.spec config will be looked at and used to obtain the actual StoreSupplier instances of this custom type. This option has the same advantages as the above, though I would qualify it in the same way. In general, having two configs that do the same thing or are tied to the same feature will be confusing to new (or old!) users. However, between these two alternatives I would personally advocate for this one as the better option, as it at least solves the concern over potential misconfiguration due to unclear interaction between the two configs.

...

Code Block
public class Materialized {

	// new methods
    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StoreTypeSpecDslStoreSuppliers typeSpecstoreSuppliers);
    public Materialized<K, V, S> withStoreTypeSpecwithStoreSuppliers(final StoreTypeSpecDslStoreSuppliers typeSpecstorestoreSuppliers);

	// deprecate old methods
	@Deprecated
    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StoreType storeType);
	@Deprecated
    public Materialized<K, V, S> withStoreType(final StoreType storeType); 
}

...