Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Add an interface StateStoreConnector that allows the implementor to specify state stores that should be connected to this processor/transformer (defaulting to no stores).

...

ConnectedStoreProvider

Code Block
public interface StateStoreConnectorConnectedStoreProvider {
    default List<StoreBuilder>Collection<StoreBuilder> stateStoresstores() {
        return Collections.emptyList()null;
    }
}


Change all Processor/TransformerSupplier  interfaces to extend from it:

...

Code Block
public interface TransformerSupplier<K, V, R> extends StateStoreConnectorConnectedStoreProvider {
    ...
}

ValueTransformerSupplier

Code Block
public interface ValueTransformerSupplier<V, VR> extends StateStoreConnectorConnectedStoreProvider {
    ...
}

ValueTransformerWithKeySupplier

Code Block
public interface ValueTransformerWithKeySupplier<K, V, VR> extends StateStoreConnectorConnectedStoreProvider {
    ...
}

ProcessorSupplier

Code Block
public interface ProcessorSupplier<K, V> extends StateStoreConnectorConnectedStoreProvider {
    ...
}


Proposed Changes

The proposal is to enhance the ProcessorSupplier and TransformerSupplier interfaces by allowing them to provide information about what state stores they "own" when constructing a topology using StreamsBuilderKStream::process, KStream::transform, and  KStream::transformValues, and Topology::addProcessor.

The public interface changes above directly imply what needs to be changed in KStream:  The process etc methods would get state store names from the list of StoreBuilders that the supplier provides(which implements ConnectedStoreProvider ) provides, rather than the var args stateStoreNames.

Additionally, the The process method would add the StoreBuilders to the topology using builder.addStateStore() and connect the store to that processor, rather than requiring the user to do it themselves.  In order to solve the problem of addStateStore potentially being called twice for the same store (because more than one Supplier specifies it), the check for duplicate stores in addStateStores will be relaxed so to allow for duplicates if the same StoreBuilder instance for the same store name.

...

Because the added interface methods are default with a reasonable default, those additions are backwards compatible.  However, given that now there would be two ways to "connect" state stores to a low level processor, we would have to specify how they behave together.  I believe the correct decision is to enforce that for a given call of stream.process(...)  or stream.transform(...) it should only be possible to specify state stores one of the two ways, either through the stateStoreNames  argument or by implementing the stateStoresstores method of StateStoreConnectorConnectedStoreProvider .  Attempting to do both should cause an exception to Specifically: if ConnectedStoreProvider::stores returns non-null and stateStoreNames is passed, an exception will be thrown making it clear that the user should choose one way or the other.

If a StoreBuilder  that was manually added is also returned by a ConnectedStoreProvider , there is no issue since adding a state store will now be idempotent.

No migration tools are required since it's a relatively minor library change.

...