Status
Current state: Under Discussion
Discussion thread: https://lists.apache.org/thread.html/600996d83d485f2b8daf45037de64a60cebdfac9b234bf3449b6b753@%3Cdev.kafka.apache.org%3E
JIRA: https://issues.apache.org/jira/browse/KAFKA-7523
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
When writing low-level Processors
and Transformers
that are stateful using kafka streams, often the processors (or transformers, I'll use "processors" to refer to both for brevity) want to "own" one or more state stores, the details of which are not important to the business logic of the application. However, when incorporating these into a topology defined by the high level DSL, using KStream:process
, you're forced to specify the state store names so the topology is wired up correctly. This creates a clumsy pattern where the "owned" state store's name must be passed alongside the TransformerSupplier
, when the supplier itself could just as easily supply that information on their own.
An example of the clumsiness:
String stateStoreName = "my-store"; StoreBuilder<KeyValueStore> storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(stateStoreName), keySerde, valSerde); topology.addStateStore(storeBuilder); ProcessorSupplier processorSupplier = new MyStatefulProcessorSupplier(stateStoreName, val -> businessLogic(val)); builder.stream("input.topic") .map(...) .filter(...) .process(processorSupplier, stateStoreName);
Both the main topology definition (the chained, high-level DSL calls on StreamBuilder
, KStream
, and KTable
) and the internal implementation of MyStatefulProcessorSupplier
need to know the state store name, when it should really only by MyStatefulProcessorSupplier
that cares. Additionally, topology.addStateStore(storeBuilder)
and the creation of the StoreBuilder
are required, all of which ought to be implicit when using MyStatefulProcessorSupplier
. Ultimately, because KStream:process
requires store names as a separate argument, all of this "wiring" code is necessary alongside or nearby actual business logic.
Ideally, it would be reducible to something like:
builder.stream("input.topic") .map(...) .filter(...) .process(MyStatefulProcessorSupplier.make(val -> businessLogic(val)));
This allows for the same level of clarity as when using Processors
(and Transformers
) as when using the high-level DSL.
Public Interfaces
KStream
Deprecate methods that take String.. stateStoreNames
and replace them with methods that don't:
@Deprecated <K1, V1> NewKStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier, final String... stateStoreNames); <K1, V1> NewKStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier); @Deprecated <VR> NewKStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier, final String... stateStoreNames); <VR> NewKStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier); @Deprecated <VR> NewKStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, final String... stateStoreNames); <VR> NewKStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier); @Deprecated void process(final ProcessorSupplier<? super K, ? super V> processorSupplier, final String... stateStoreNames); void process(final ProcessorSupplier<? super K, ? super V> processorSupplier);
Add method to supplier interfaces allowing them to specify state stores they own and how to build them.
TransformerSupplier
public interface TransformerSupplier<K, V, R> { Transformer<K, V, R> get(); default List<StoreBuilder> stateStores() { return Collections.emptyList(); } }
ValueTransformerSupplier
public interface ValueTransformerSupplier<V, VR> { ValueTransformer<V, VR> get(); default List<StoreBuilder> stateStores() { return Collections.emptyList(); } }
ValueTransformerWithKeySupplier
public interface ValueTransformerWithKeySupplier<K, V, VR> { ValueTransformerWithKey<K, V, VR> get(); default List<StoreBuilder> stateStores() { return Collections.emptyList(); } }
ProcessorSupplier
public interface ProcessorSupplier<K, V> { Processor<K, V> get(); default List<StoreBuilder> stateStores() { return Collections.emptyList(); } }
Proposed Changes
The proposal is to change the ProcessorSupplier
and TransformerSupplier
interfaces by allowing them to provide information about what state stores they own, eliminating (and deprecating) the need to pass in state store names when constructing a topology using StreamsBuilder
, KStream:process,
KStream::transform, and
KStream::transformValues
.
The public interface changes above directly imply what needs to be changed in KStream:
The process
etc methods would get state store names from the list of StoreBuilders
that the supplier provides, rather than the var args stateStoreNames
. Additionally, the process
method would add the StoreBuilders
to the topology using builder.addStateStore()
rather than requiring the user to do it themselves.
Compatibility, Deprecation, and Migration Plan
Deprecated methods will continue to behave as they previously have. Because the added interface methods are default
with a reasonable default, those additions are backwards compatible. Ideally, the deprecated methods would throw an exception if state stores are provided via *Supplier::stateStores
and the var args stateStoreNames
, to reduce confusion.
No migration tools are required since it's a relatively minor library change.
Existing behavior (methods) could be removed at the next major release without a full year deprecation period, since KStream
is InterfaceStability.Evolving
. While InterfaceStability.Evolving
allows for a minor release breakage, it seems entirely unnecessary to rush the removal of those deprecated methods.
Alternatives
This section isn't called "rejected alternatives" yet like the template because I don't think they're necessarily rejected yet; they should be discussed.
Don't deprecate the existing KStream methods that take stateStoreNames
This may be desirable if users want to create StoreBuilders
or specify store names far away from the Processor
logic itself. I can't think of a major reason why this would be desired - as is evidenced by this KIP, I have the opposite problem. And I'm not sure that these changes were necessarily prevent that so much as just change how that would work. In general I favor deprecation just to keep the API smaller and so there aren't two ways of doing things.
Have the added method on the Supplier interfaces only return store names, not builders
This solves the original issue only partially, but with perhaps less "API risk." The String... stateStoreNames
argument would no longer be needed on the KStream
methods, but the user would still need to manually add the StoreBuilders
to the Topology
. The downside is we don't achieve the full reduction of "wiring up" code required when building the topology (the user still needs to know to call topology.
addStateStore())
, but the upside is that the StoreBuilder
is less coupled to the *Supplier
. I don't consider this upside significant, but perhaps there are other use cases I'm not considering.
Do nothing
This is a "quality of life" API improvement, and nothing more, so maybe it's unneeded churn. I favor doing something (obviously) because I think that while small, this change can be a major usability improvement for the low-level API.