You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 15 Next »

Status

Current state: Under Discussion

Discussion thread: here 

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

  1. Supporting Custom Stores. We want to complete the work outlined in KIP-591, specifically extending the functionality to cover custom state stores. This KIP expands on and makes feasible the "Rejected Alternative" in KIP-591, which was rejected in favor of reducing scope. Without this KIP, DSL users must carefully examine the output of their topology description and wire in custom state stores (using the Materialized  API) at every location that uses state. This is both tedious and error prone; if Materialized  is used in an incorrect location an unnecessary state store may be created. In the inverse scenario, missing a Materialized  call would silently create a RocksDB store.
  2. Supporting Default Stores for Stream-Stream Joins. We would like to address a gap in the API for stream-stream joins (really 3 separate issues). For a quick refresher, these are a special case among DSL operators that cannot be configured via Materialized  and are instead the lone instance of an operator configuration accepting a StoreSupplier directly (rather than indirectly, eg via a StoreBuilder or Materialized). For this reason, they are completely missed by the default.dsl.store config as it currently operates exclusively through the Materialized class. The only way to configure the underlying state store implementation is with the StreamJoined configuration class.
  3. Support Custom Stores for Stream-Stream Joins. As described in KAFKA-14976, some stream-stream join types now use an additional store which is not windowed but rather a KeyValueStore. This is not customizable and not exposed to the user in any way – it simply hardcodes a RocksDBStore or InMemoryStore depending on what the join's WindowStoreSupplier(s) return for the StateStore#persistent API. This means no matter what one does, there is no way to use all (and only) custom state store implementations for any application including these stream-stream joins!

The 2nd problem follows from the 1st: because you are forced to configure these stores through the StreamJoined class, which in turn requires a WindowStoreSupplier (or two), it's up to the user to figure out the correct values for all three parameters (windowSize, retentionPeriod, and retainDuplicates). And not only is there indeed a "correct" answer for all three (unlike other operators which let users freely choose some or all of the store parameters), it is difficult to determine what those expected values are. For example, retainDuplicates must always be true for stream-stream joins, while the retentionPeriod is fixed at the windowSize + gracePeriod. Most frustratingly, however, is that the windowSize itself is not what you might assume or expect: if you used the old JoinWindows#of (or new JoinWindows#ofTimeDifferenceAndNoGrace), the WindowStoreSupplier will actually require twice the "time difference" passed into your JoinWindows. And the only way to find out is trial and error or checking the code after you get a runtime exception for passing in the wrong value(s).

Public Interfaces

New Interfaces

In order to allow users to implement their own default stores, we will need to provide the following interface. This interface will be used when instantiating a new store when no StoreSupplier is provided (either via Materialized  or StoreSupplier  APIs):

DSLStoreProvider.java
public interface DslStorePlugin {

    KeyValueBytesStoreSupplier keyValueStore(final KeyValuePluginParams params);

    WindowBytesStoreSupplier windowStore(final WindowPluginParams params);

    SessionBytesStoreSupplier sessionStore(final SessionPluginParams params);
}

// the below are all "struct"-like classes with the following fields
class KeyValuePluginParams(String name);
class WindowPluginParams(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates, EmitStrategy emitStrategy);
class SessionPluginParams(String name, Duration retentionPeriod, EmitStrategy emitStrategy);

Note on Evolving API: a concern raised on KIP-591 about having such an interface is that the increased API surface area would mean introducing new store implementations would cause custom state store implementations to throw compile time errors. Introducing the *Params classes will prevent such issues unless an entirely new store type is added.

If an entirely new state store type (beyond KV/Windowed/Session) is added - I think it is valid to have new store types have a default implementation that throws new UnsupportedOperationException() as it is unlikely that users that specify a custom state store as the default will want a different (e.g. ROCKS_DB) store created without them knowing. This also seems unlikely since these three have been there for many years and they've been the only three for that duration.

OOTB Store Type Specs

We will provide a default implementations of the above interface in a class org.apache.kafka.streams.DefaultDslStorePlugin. This default implementation will use either RocksDB or In Memory stores depending on the default.dsl.store  configuration (see rejected alternatives for a discussion here).

Configuration

A user can optionally specify a DslStorePlugin by means of a new config, defined below, which defaults to org.apache.kafka.streams.DefaultDslStorePlugin.

public static final String DSL_STORE_PLUGIN_CLASS_CONFIG = "dsl.store.plugin.class";
public static final String DSL_STORE_PLUGIN_CLASS_DOC = "Defines which store implementations to plug in to DSL operators. Must implement the <code>org.apache.kafka.streams.state.DslStorePlugin</code> interface.";

Example usage of this configuration:

dsl.store.plugin.class = com.my.company.MyCustomStoreTypeSpec

If using anything other than dsl.store.plugin.class = org.apahce.kafka.streams.DefaultDslStorePlugin  the default.dsl.store  configuration will be ignored.

Materialized API

In the existing code, users are able to override the default store type by leveraging the existing Materialized.as and Materialized.withStoreType  methods. We will change the signature of these methods (see below on how we maintain compatibility) to take in a DslStorePlugin  instead of the StoreType  enum:

Materialized.java
public class Materialized {

    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final DslStorePlugin storePlugin);

    public Materialized<K, V, S> withStoreType(final DslStorePlugin storePlugin);
}

In order to ensure code compatibility and smooth upgrade paths, we will leave the old enums in place and have them implement DslStorePlugin:

Stores.java
package org.apache.kafka.streams.kstream;

public class Materialized {
	...
	public enum StoreType implements DslStorePlugin {
		// DefaultDslStorePlugin will itself be a delegate to two different implementations
		// which will be stored in static fields ROCKS_DB and IN_MEMORY, choosing which one
		// to delegate to depending on the default.dsl.store configuration
    	ROCKS_DB(DefaultDslStorePlugin.ROCKS_DB),
    	IN_MEMORY(DefaultDslStorePlugin.IN_MEMORY);
	
		private final DslStorePlugin delegatePlugin;

		StoreTypeSpec(final DslStorePlugin delegate) {
			this.delegatePlugin = delegate;
		}

		// delegate all methods to delegatePlugin
	}
	...
}

StreamJoined API

Finally, to address the three problems with configuring state stores for stream-stream joins today, we propose one last new method that will operate similarly to the new Materialized APIs but for StreamJoined:

StreamJoined.java
public class StreamJoined {

    /**
     * Creates a StreamJoined instance with the given {@link DslStorePlugin}. The store plugin
     * will be used to get all the state stores in this operation that do not otherwise have an
     * explicitly configured {@link org.apache.kafka.streams.state.DslStorePlugin}.
     *
     * @param storePlugin         the store plugin that will be used for state stores
     * @param <K>                 the key type
     * @param <V1>                this value type
     * @param <V2>                other value type
     * @return                    {@link StreamJoined} instance
     */
    public static <K, V1, V2> StreamJoined<K, V1, V2> with(final DslStorePlugin storePlugin);
    
    /**
     * Configure with the provided {@link DslStorePlugin} for all state stores that are not
     * configured with a {@link org.apache.kafka.streams.state.DslStorePlugin} already.
     * 
     * @param storePlugin  the store plugin to use for plugging in state stores
     * @return             a new {@link StreamJoined} configured with this store plugin
     */
    public StreamJoined<K, V1, V2> withStorePlugin(final DslStorePlugin storePlugin);
}  

Proposed Changes

There will be no functional changes as part of this KIP. Implementations for all of the above interfaces will be provided. Note the following discussion points:

  • Terminology: "Store Type" vs "Store Implementation". A store "type" is a top level DSL store type - there are currently only three: KV, Windowed and Session. A "store implementation" is the chosen implementation of the specified store type. Kafka Streams provides RocksDB and in-memory store implementations OOTB.
  • Versioned Tables, like timestamped tables, are an implementation detail of the chosen plugin when providing a store type (usually KV stores). In the future, the DefaultDslStorePlugin is free to switch its default implementation of the KV store to return a versioned implementation instead. This KIP does not propose promoting Versioned tables to the top level API here.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

N/A

  • If we are changing behavior how will we phase out the older behavior?

N/A

  • If we need special migration tools, describe them here.

N/A

  • When will we remove the existing behavior?

N/A

Test Plan

Existing test coverage for the default.dsl.store configuration will be migrated to use the new configuration and ensure that the default behavior can be changed.

Rejected Alternatives

Deprecate Existing Config

We can deprecate the existing config entirely, and instead provide two out of the box implementations of dsl.store.plugin.class:

  • RocksDbStorePlugin
  • InMemoryStorePlugin

This makes it a little easier for users to reason about as there is only a single config. It also would make it a bit easier to aadd new store implementations going forward without cluttering the DefaultStorePlugin class. The downside is that it is more cumbersome to configure (an enum is easy to remember and clear in the configs - as opposed to a class). The other downside is that there will be a deprecation period where both are active and if users ignore those deprecations they will have compile time errors when we remove the original config.

Add CUSTOM Enum to Existing Config

In the proposal above, the dsl.store.plugin.class is the first to be resolved, and then only if DefaultDslStorePlugin is used the old default.dsl.store config is used. We could flip the semantics, and require users to specify CUSTOM as the default.dsl.store and only then check the new configuration. This option is slightly more cumbersome over the proposed design, and doesn't have any clear advantages.

Deprecating StoreType Enum, Materialized.as(StoreType) and Materialized.withStoreType(StoreType) APIs

We could deprecate the Materialized.StoreType  enum and introduce new methods specific to the new store type spec:

public class Materialized {

	// new methods
    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final DslStorePlugin storePlugin);
    public Materialized<K, V, S> withStorePlugin(final DslStorePlugin storePlugin);

	// deprecate old methods
	@Deprecated
    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StoreType storeType);
	@Deprecated
    public Materialized<K, V, S> withStoreType(final StoreType storeType); 
}

This is a viable strategy, but rejected for two reasons:

  1. Using an enum is more ergonomic for users in the code (compare Materialized.withStoreType(ROCKS_DB) with Materialized.withStoreTypeSpec(new RocksDbStoreTypeSpec())
  2. It causes code-level deprecation, which is annoying for users to migrate away from


  • No labels