Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleDSLStoreProvider.java
public interface/**
 * {@code DslStoreSuppliers} extendsdefines Configurable {

    KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params);

    WindowBytesStoreSupplier windowStore(final DslWindowParams params);

    SessionBytesStoreSupplier sessionStore(final DslSessionParams params);
}

// the below are all "struct"-like classes with the following fields
class DslKeyValueParams(String name);
class DslWindowParams(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates, EmitStrategy emitStrategy);
class DslSessionParams(String name, Duration retentionPeriod, EmitStrategy emitStrategy);
Info

Note on Evolving API: a concern raised on KIP-591 about having such an interface is that the increased API surface area would mean introducing new store implementations would cause custom state store implementations to throw compile time errors. Introducing the *Params classes will prevent such issues unless an entirely new store type is added.

If an entirely new state store type (beyond KV/Windowed/Session) is added - I think it is valid to have new store types have a default implementation that throws new UnsupportedOperationException() as it is unlikely that users that specify a custom state store as the default will want a different (e.g. ROCKS_DB) store created without them knowing. This also seems unlikely since these three have been there for many years and they've been the only three for that duration.

OOTB Store Type Specs

We will provide a default implementations of the above interfaces:

  • org.apache.kafka.streams.RocksDbDslStoreSuppliers
  • org.apache.kafka.streams.InMemoryDslStoreSuppliers

Configuration

These interfaces will be specified by means of a new config, defined below. This takes precedence over the old default.dsl.store config, which will be deprecated to provide clear signal on the preferred method. Further discussion on this point is under "Rejected Alternatives":

Code Block
public static final String DSL_STORE_SUPPLIERS_CLASS_CONFIG = "dsl.store.suppliers.class";
public static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store implementations to plug in to DSL operators. Must implement the <code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface.";
public static final Class DSL_STORE_SUPPLIERS_CLASS_DEFAULT = org.apache.kafka.streams.RocksDbDslStoreSuppliers.class;

Example usage of this configuration:

...

a grouping of factories to construct
 * stores for each of the types of state store implementations in Kafka
 * Streams. This allows configuration of a default store supplier beyond
 * the builtin defaults of RocksDB and In-Memory.
 *
 * <p>There are various ways that this configuration can be supplied to
 * the application (in order of precedence):
 * <ol>
 *     <li>Passed in directly to a DSL operator via either
 *     {@link org.apache.kafka.streams.kstream.Materialized#as(DslStoreSuppliers)},
 *     {@link org.apache.kafka.streams.kstream.Materialized#withStoreType(DslStoreSuppliers)}, or
 *     {@link org.apache.kafka.streams.kstream.StreamJoined#withDslStoreSuppliers(DslStoreSuppliers)}</li>
 *
 *     <li>Passed in via a Topology configuration override (configured in a
 *     {@link org.apache.kafka.streams.TopologyConfig} and passed into the
 *     {@link org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig)} constructor</li>
 *
 *     <li>Configured as a global default in {@link org.apache.kafka.streams.StreamsConfig} using
 *     the {@link org.apache.kafka.streams.StreamsConfig#DSL_STORE_SUPPLIERS_CLASS_CONFIG}</li>
 *     configuration.
 * </ol>
 *
 * <p>Kafka Streams is packaged with some pre-existing {@code DslStoreSuppliers}
 * that exist in {@link BuiltInDslStoreSuppliers}
 */
public interface DslStoreSuppliers extends Configurable {

    KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params);

    WindowBytesStoreSupplier windowStore(final DslWindowParams params);

    SessionBytesStoreSupplier sessionStore(final DslSessionParams params);
}

// the below are all "struct"-like classes with the following fields
class DslKeyValueParams(String name);
class DslWindowParams(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates, EmitStrategy emitStrategy, boolean isSlidingWindow, boolean isTimestamped);
class DslSessionParams(String name, Duration retentionPeriod, EmitStrategy emitStrategy);


Info

Note on Evolving API: a concern raised on KIP-591 about having such an interface is that the increased API surface area would mean introducing new store implementations would cause custom state store implementations to throw compile time errors. Introducing the *Params classes will prevent such issues unless an entirely new store type is added.

If an entirely new state store type (beyond KV/Windowed/Session) is added - I think it is valid to have new store types have a default implementation that throws new UnsupportedOperationException() as it is unlikely that users that specify a custom state store as the default will want a different (e.g. ROCKS_DB) store created without them knowing. This also seems unlikely since these three have been there for many years and they've been the only three for that duration.

OOTB Store Type Specs

We will provide a default implementations of the above interfaces:

  • org.apache.kafka.streams.RocksDbDslStoreSuppliers
  • org.apache.kafka.streams.InMemoryDslStoreSuppliers

Configuration

These interfaces will be specified by means of a new config, defined below. This takes precedence over the old default.dsl.store config, which will be deprecated to provide clear signal on the preferred method. Further discussion on this point is under "Rejected Alternatives":

Code Block
public static final String DSL_STORE_SUPPLIERS_CLASS_CONFIG = "dsl.store.suppliers.class";
public static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store implementations to plug in to DSL operators. Must implement the <code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface.";
public static final Class DSL_STORE_SUPPLIERS_CLASS_DEFAULT = org.apache.kafka.streams.RocksDbDslStoreSuppliers.class;

// we also add this constant for the old configuration so that it is easily referenceable
// across the code base
public static final String DEFAULT_DSL_STORE = ROCKS_DB;

Example usage of this configuration:

Code Block
dsl.store.suppliers.class = com.my.company.MyCustomStoreSuppliers


Info
titleBehavior Change for default.dsl.store

The new dsl.store.suppliers.class will respect the configurations when passed in via KafkaStreams#new(Topology, StreamsConfig)  (and other related constructors) instead of only being respected when passed in to the initial StoreBuilder#new(TopologyConfig), which is what the old default.dsl.store  configuration required.

TopologyConfig

We will expose the following method from TopologyConfig :

Code Block
    /**
     * @return the DslStoreSuppliers if the value was explicitly configured (either by
     *         {@link StreamsConfig#DEFAULT_DSL_STORE} or {@link StreamsConfig#DSL_STORE_SUPPLIERS_CLASS_CONFIG})
     */
    public Optional<DslStoreSuppliers> resolveDslStoreSuppliers() {
        if (isTopologyOverride(DSL_STORE_SUPPLIERS_CLASS_CONFIG, topologyOverrides) || globalAppConfigs.originals().containsKey(DSL_STORE_SUPPLIERS_CLASS_CONFIG)) {
            return Optional.of(Utils.newInstance(dslStoreSuppliers, DslStoreSuppliers.class));
        } else if (isTopologyOverride(DEFAULT_DSL_STORE_CONFIG, topologyOverrides) || globalAppConfigs.originals().containsKey(DEFAULT_DSL_STORE_CONFIG)) {
            return Optional.of(MaterializedInternal.parse(storeType));
        } else {
            return Optional.empty();
        }

This allows us to determine whether or not a DslStoreSuppliers was configured or not, and also respects the hierarchy of resolution described in the javadocs to DslStoreSuppliers.

Materialized API

In the existing code, users are able to override the default store type by leveraging the existing Materialized.as and Materialized.withStoreType  methods. We will change the signature of these methods (see below on how we maintain compatibility) to take in a DslStoreSuppliers  instead of the StoreType  enum:

...