...
Add an interface StateStoreConnector
that allows the implementor to specify state stores that should be connected to this processor/transformer (defaulting to no stores).
...
ConnectedStoreProvider
Code Block |
---|
public interface StateStoreConnectorConnectedStoreProvider { default List<StoreBuilder>Collection<StoreBuilder> stateStoresstores() { return Collections.emptyList()null; } } |
Change all Processor/TransformerSupplier
interfaces to extend from it:
...
Code Block |
---|
public interface TransformerSupplier<K, V, R> extends StateStoreConnectorConnectedStoreProvider { ... } |
ValueTransformerSupplier
Code Block |
---|
public interface ValueTransformerSupplier<V, VR> extends StateStoreConnectorConnectedStoreProvider { ... } |
ValueTransformerWithKeySupplier
Code Block |
---|
public interface ValueTransformerWithKeySupplier<K, V, VR> extends StateStoreConnectorConnectedStoreProvider { ... } |
ProcessorSupplier
Code Block |
---|
public interface ProcessorSupplier<K, V> extends StateStoreConnectorConnectedStoreProvider { ... } |
Proposed Changes
The proposal is to enhance the ProcessorSupplier
and TransformerSupplier
interfaces by allowing them to provide information about what state stores they "own" when constructing a topology using StreamsBuilder
, KStream::process,
KStream::transform, and
KStream::transformValues, and Topology::addProcessor.
The public interface changes above directly imply what needs to be changed in KStream:
The process
etc methods would get state store names from the list of StoreBuilders
that the supplier provides(which implements ConnectedStoreProvider
) provides, rather than the var args stateStoreNames
.
Additionally, the The process
method would add the StoreBuilders
to the topology using builder.addStateStore()
and connect the store to that processor, rather than requiring the user to do it themselves. In order to solve the problem of addStateStore
potentially being called twice for the same store (because more than one Supplier
specifies it), the check for duplicate stores in addStateStores
will be relaxed so to allow for duplicates if the same StoreBuilder
instance for the same store name.
...
Because the added interface methods are default
with a reasonable default, those additions are backwards compatible. However, given that now there would be two ways to "connect" state stores to a low level processor, we would have to specify how they behave together. I believe the correct decision is to enforce that for a given call of stream.process(...)
or stream.transform(...)
it should only be possible to specify state stores one of the two ways, either through the stateStoreNames
argument or by implementing the stateStores
stores
method of StateStoreConnector
ConnectedStoreProvider
. Attempting to do both should cause an exception to Specifically: if ConnectedStoreProvider::stores
returns non-null and stateStoreNames
is passed, an exception will be thrown making it clear that the user should choose one way or the other.
If a StoreBuilder
that was manually added is also returned by a ConnectedStoreProvider
, there is no issue since adding a state store will now be idempotent.
No migration tools are required since it's a relatively minor library change.
...