Table of Contents |
---|
Status
Current state: Under Discussion Accepted
Discussion thread: https://lists.apache.org/thread.html/600996d83d485f2b8daf45037de64a60cebdfac9b234bf3449b6b753@%3Cdev.kafka.apache.org%3E
JIRA: https://issues.apache.org/jira/browse/
Jira | ||||||
---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block |
---|
public interface ConnectedStoreProvider { default Collection<StoreBuilder>Set<StoreBuilder> stores() { return null; } } |
...
The process
method would add the StoreBuilders
to the topology using builder.addStateStore()
and connect the store to that processor, rather than requiring the user to do it themselves. In order to solve the problem of addStateStore
potentially being called twice for the same store (because more than one Supplier
specifies it), the check for duplicate stores in addStateStores
will be relaxed to allow for duplicates if the same StoreBuilder
instance for the same store namename (compared by referenced, not equals()
).
Compatibility, Deprecation, and Migration Plan
Because the added interface methods are default
with a reasonable default, those additions are backwards compatible. However, given that now there would be two ways
A user may continue to "connect" state stores to a low level processor, we would have to specify how they behave together. I believe the correct decision is to enforce that for a given call of processor by passing stateStoreNames
when calling stream.process/transform(...)
or stream.transform(...)
it should only be possible to specify state stores one of the two ways, either through the stateStoreNames
argument or by implementing the stores
method of ConnectedStoreProvider
. Specifically: if ConnectedStoreProvider::stores
returns non-null and stateStoreNames
is passed, an exception will be thrown making it clear that the user should choose one way or the other. This may be used in combination with a Supplier
that provides its own state stores by implementing ConnectedStoreProvider::stores()
.
If a StoreBuilder
that was manually added is also returned by a ConnectedStoreProvider
, there is no issue since adding a state store will now be idempotent.
...