...
Code Block |
---|
builder.stream("input.topic") .map(...) .filter(...) .process(MyStatefulProcessorSupplier.make(val -> businessLogic(val))); |
This allows for the same level "reads top to bottom" type of clarity as when using Processors
(and Transformers
) as when using the high-level DSL.
Public Interfaces
KStream
Deprecate methods that take String.. stateStoreNames
and replace them with methods that don't:
Add an interface StateStoreConnector
that allows the implementor to specify state stores that should be connected to this processor/transformer (defaulting to no stores).
StateStoreConnector
Code Block |
---|
public interface StateStoreConnector { |
Code Block |
@Deprecated <K1, V1> NewKStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier, default final String... stateStoreNames); <K1, V1> NewKStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier); @Deprecated <VR> NewKStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,List<StoreBuilder> stateStores() { final String... stateStoreNames); <VR> NewKStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier); @Deprecated <VR> NewKStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, final String... stateStoreNames); <VR> NewKStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier); @Deprecated void process(final ProcessorSupplier<? super K, ? super V> processorSupplier, final String... stateStoreNames); void process(final ProcessorSupplier<? super K, ? super V> processorSupplier);return Collections.emptyList(); } } |
Change all Processor/TransformerSupplier
interfaces to extend from it:Add method to supplier interfaces allowing them to specify state stores they own and how to build them.
TransformerSupplier
Code Block |
---|
public interface TransformerSupplier<K, V, R> { Transformer<K, V, R> get(); default List<StoreBuilder> stateStores()extends StateStoreConnector { return Collections.emptyList(); }... } |
ValueTransformerSupplier
Code Block |
---|
public interface ValueTransformerSupplier<V, VR> { ValueTransformer<V, VR> get(); default List<StoreBuilder> stateStores()extends StateStoreConnector { return Collections.emptyList(); }... } |
ValueTransformerWithKeySupplier
Code Block |
---|
public interface ValueTransformerWithKeySupplier<K, V, VR> { ValueTransformerWithKey<K, V, VR> get(); default List<StoreBuilder> stateStores()extends StateStoreConnector { return Collections.emptyList(); }... } |
ProcessorSupplier
Code Block |
---|
public interface ProcessorSupplier<K, V> { Processor<K, V> get(); default List<StoreBuilder> stateStores()extends StateStoreConnector { return Collections.emptyList(); }... } |
Proposed Changes
The proposal is to change enhance the ProcessorSupplier
and TransformerSupplier
interfaces by allowing them to provide information about what state stores they own , eliminating (and deprecating) the need to pass in state store names when constructing a topology using StreamsBuilder
, KStream:process,
KStream::transform, and
KStream::transformValues
.
...
Compatibility, Deprecation, and Migration Plan
Deprecated methods will continue to behave as they previously have. Because the added interface methods are default
with a reasonable default, those additions are backwards compatible. Ideally, the deprecated methods would throw an exception if state stores are provided via *Supplier::stateStores
and the var args stateStoreNames
, to reduce confusionHowever, given that now there would be two ways to "connect" state stores to a low level processor, we would have to specify how they behave together. I believe the correct decision is to enforce that for a given call of stream.process(...)
or stream.transform(...)
it should only be possible to specify state stores one of the two ways, either through the stateStoreNames
argument or by implementing the stateStores
method of StateStoreConnector
. Attempting to do both should cause an exception to be thrown making it clear that the user should choose one way or the other.
No migration tools are required since it's a relatively minor library change.
Existing behavior (methods) could be removed at the next major release without a full year deprecation period, since KStream
is InterfaceStability.Evolving
. While InterfaceStability.Evolving
allows for a minor release breakage, it seems entirely unnecessary to rush the removal of those deprecated methods.
Alternatives
This section isn't called "rejected alternatives" yet like the template because I don't think they're necessarily rejected yet; they should be discussed.
Don't deprecate the existing KStream methods that take stateStoreNames
This may be desirable if users want to create StoreBuilders
or specify store names far away from the Processor
logic itself. I can't think of a major reason why this would be desired - as is evidenced by this KIP, I have the opposite problem. And I'm not sure that these changes were necessarily prevent that so much as just change how that would work. In general I favor deprecation just to keep the API smaller and so there aren't two ways of doing things.
Have the added method on the Supplier interfaces only return store names, not builders
...