...
Code Block | ||||
---|---|---|---|---|
| ||||
public interface/** * {@code DslStoreSuppliers} extendsdefines Configurable { KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params); WindowBytesStoreSupplier windowStore(final DslWindowParams params); SessionBytesStoreSupplier sessionStore(final DslSessionParams params); } // the below are all "struct"-like classes with the following fields class DslKeyValueParams(String name); class DslWindowParams(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates, EmitStrategy emitStrategy); class DslSessionParams(String name, Duration retentionPeriod, EmitStrategy emitStrategy); |
Info |
---|
Note on Evolving API: a concern raised on KIP-591 about having such an interface is that the increased API surface area would mean introducing new store implementations would cause custom state store implementations to throw compile time errors. Introducing the *Params classes will prevent such issues unless an entirely new store type is added. If an entirely new state store type (beyond KV/Windowed/Session) is added - I think it is valid to have new store types have a default implementation that throws |
OOTB Store Type Specs
We will provide a default implementations of the above interfaces:
- org.apache.kafka.streams.RocksDbDslStoreSuppliers
- org.apache.kafka.streams.InMemoryDslStoreSuppliers
Configuration
These interfaces will be specified by means of a new config, defined below. This takes precedence over the old default.dsl.store config, which will be deprecated to provide clear signal on the preferred method. Further discussion on this point is under "Rejected Alternatives":
Code Block |
---|
public static final String DSL_STORE_SUPPLIERS_CLASS_CONFIG = "dsl.store.suppliers.class";
public static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store implementations to plug in to DSL operators. Must implement the <code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface.";
public static final Class DSL_STORE_SUPPLIERS_CLASS_DEFAULT = org.apache.kafka.streams.RocksDbDslStoreSuppliers.class;
// we also add this constant for the old configuration so that it is easily referenceable
// across the code base
public static final String DEFAULT_DSL_STORE = ROCKS_DB; |
Example usage of this configuration:
Code Block |
---|
dsl.store.suppliers.class = com.my.company.MyCustomStoreSuppliers |
...
title | Behavior Change for default.dsl.store |
---|
a grouping of factories to construct
* stores for each of the types of state store implementations in Kafka
* Streams. This allows configuration of a default store supplier beyond
* the builtin defaults of RocksDB and In-Memory.
*
* <p>There are various ways that this configuration can be supplied to
* the application (in order of precedence):
* <ol>
* <li>Passed in directly to a DSL operator via either
* {@link org.apache.kafka.streams.kstream.Materialized#as(DslStoreSuppliers)},
* {@link org.apache.kafka.streams.kstream.Materialized#withStoreType(DslStoreSuppliers)}, or
* {@link org.apache.kafka.streams.kstream.StreamJoined#withDslStoreSuppliers(DslStoreSuppliers)}</li>
*
* <li>Passed in via a Topology configuration override (configured in a
* {@link org.apache.kafka.streams.TopologyConfig} and passed into the
* {@link org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig)} constructor</li>
*
* <li>Configured as a global default in {@link org.apache.kafka.streams.StreamsConfig} using
* the {@link org.apache.kafka.streams.StreamsConfig#DSL_STORE_SUPPLIERS_CLASS_CONFIG}</li>
* configuration.
* </ol>
*
* <p>Kafka Streams is packaged with some pre-existing {@code DslStoreSuppliers}
* that exist in {@link BuiltInDslStoreSuppliers}
*/
public interface DslStoreSuppliers extends Configurable {
KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params);
WindowBytesStoreSupplier windowStore(final DslWindowParams params);
SessionBytesStoreSupplier sessionStore(final DslSessionParams params);
}
// the below are all "struct"-like classes with the following fields
class DslKeyValueParams(String name);
class DslWindowParams(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates, EmitStrategy emitStrategy);
class DslSessionParams(String name, Duration retentionPeriod, EmitStrategy emitStrategy); |
Info |
---|
Note on Evolving API: a concern raised on KIP-591 about having such an interface is that the increased API surface area would mean introducing new store implementations would cause custom state store implementations to throw compile time errors. Introducing the *Params classes will prevent such issues unless an entirely new store type is added. If an entirely new state store type (beyond KV/Windowed/Session) is added - I think it is valid to have new store types have a default implementation that throws |
OOTB Store Type Specs
We will provide a default implementations of the above interfaces:
- org.apache.kafka.streams.RocksDbDslStoreSuppliers
- org.apache.kafka.streams.InMemoryDslStoreSuppliers
Configuration
These interfaces will be specified by means of a new config, defined below. This takes precedence over the old default.dsl.store config, which will be deprecated to provide clear signal on the preferred method. Further discussion on this point is under "Rejected Alternatives":
Code Block |
---|
public static final String DSL_STORE_SUPPLIERS_CLASS_CONFIG = "dsl.store.suppliers.class";
public static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store implementations to plug in to DSL operators. Must implement the <code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface.";
public static final Class DSL_STORE_SUPPLIERS_CLASS_DEFAULT = org.apache.kafka.streams.RocksDbDslStoreSuppliers.class;
// we also add this constant for the old configuration so that it is easily referenceable
// across the code base
public static final String DEFAULT_DSL_STORE = ROCKS_DB; |
Example usage of this configuration:
Code Block |
---|
dsl.store.suppliers.class = com.my.company.MyCustomStoreSuppliers |
Info | ||
---|---|---|
| ||
The new |
TopologyConfig
We will expose the following method from TopologyConfig
:
Code Block |
---|
/**
* @return the DslStoreSuppliers if the value was explicitly configured (either by
* {@link StreamsConfig#DEFAULT_DSL_STORE} or {@link StreamsConfig#DSL_STORE_SUPPLIERS_CLASS_CONFIG})
*/
public Optional<DslStoreSuppliers> resolveDslStoreSuppliers() {
if (isTopologyOverride(DSL_STORE_SUPPLIERS_CLASS_CONFIG, topologyOverrides) || globalAppConfigs.originals().containsKey(DSL_STORE_SUPPLIERS_CLASS_CONFIG)) {
return Optional.of(Utils.newInstance(dslStoreSuppliers, DslStoreSuppliers.class));
} else if (isTopologyOverride(DEFAULT_DSL_STORE_CONFIG, topologyOverrides) || globalAppConfigs.originals().containsKey(DEFAULT_DSL_STORE_CONFIG)) {
return Optional.of(MaterializedInternal.parse(storeType));
} else {
return Optional.empty();
} |
This allows us to determine whether or not a DslStoreSuppliers was configured or not, and also respects the hierarchy of resolution described in the javadocs to DslStoreSuppliers
...
.
Materialized API
In the existing code, users are able to override the default store type by leveraging the existing Materialized.as
and Materialized.withStoreType
methods. We will change the signature of these methods (see below on how we maintain compatibility) to take in a DslStoreSuppliers
instead of the StoreType
enum:
...