Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
builder.stream("input.topic")
        .map(...)
        .filter(...)
        .process(MyStatefulProcessorSupplier.make(val -> businessLogic(val)));


This allows for the same level "reads top to bottom" type of clarity as when using Processors (and Transformers) as when using the high-level DSL.

Public Interfaces

KStream

Deprecate methods that take String.. stateStoreNames and replace them with methods that don't:

Add an interface StateStoreConnector that allows the implementor to specify state stores that should be connected to this processor/transformer (defaulting to no stores).

StateStoreConnector

Code Block
public interface StateStoreConnector {
Code Block
@Deprecated
<K1, V1> NewKStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
    default                                  final String... stateStoreNames);

<K1, V1> NewKStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier);

@Deprecated
<VR> NewKStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,List<StoreBuilder> stateStores() {
                                       final String... stateStoreNames);

<VR> NewKStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier);

@Deprecated
<VR> NewKStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
                                       final String... stateStoreNames);

<VR> NewKStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier);

@Deprecated
void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
             final String... stateStoreNames);

void process(final ProcessorSupplier<? super K, ? super V> processorSupplier);return Collections.emptyList();
    }
}


Change all Processor/TransformerSupplier  interfaces to extend from it:Add method to supplier interfaces allowing them to specify state stores they own and how to build them. 


TransformerSupplier

Code Block
public interface TransformerSupplier<K, V, R> {
    Transformer<K, V, R> get();
    default List<StoreBuilder> stateStores()extends StateStoreConnector {
        return Collections.emptyList();
    }...
}

ValueTransformerSupplier

Code Block
public interface ValueTransformerSupplier<V, VR> {
    ValueTransformer<V, VR> get();
    default List<StoreBuilder> stateStores()extends StateStoreConnector {
        return Collections.emptyList();
    }...
}

ValueTransformerWithKeySupplier

Code Block
public interface ValueTransformerWithKeySupplier<K, V, VR> {
    ValueTransformerWithKey<K, V, VR> get();
    default List<StoreBuilder> stateStores()extends StateStoreConnector {
        return Collections.emptyList();
    }...
}

ProcessorSupplier

Code Block
public interface ProcessorSupplier<K, V> {
    Processor<K, V> get();
    default List<StoreBuilder> stateStores()extends StateStoreConnector {
        return Collections.emptyList();
    }...
}


Proposed Changes

The proposal is to change enhance the ProcessorSupplier and TransformerSupplier interfaces by allowing them to provide information about what state stores they own , eliminating (and deprecating) the need to pass in state store names when constructing a topology using StreamsBuilderKStream:process, KStream::transform, and KStream::transformValues.

...

Compatibility, Deprecation, and Migration Plan

Deprecated methods will continue to behave as they previously have.  Because the added interface methods are default with a reasonable default, those additions are backwards compatible.  Ideally, the deprecated methods would throw an exception if state stores are provided via *Supplier::stateStores and the var args stateStoreNames, to reduce confusionHowever, given that now there would be two ways to "connect" state stores to a low level processor, we would have to specify how they behave together.  I believe the correct decision is to enforce that for a given call of stream.process(...)  or stream.transform(...) it should only be possible to specify state stores one of the two ways, either through the stateStoreNames  argument or by implementing the stateStores method of StateStoreConnector .  Attempting to do both should cause an exception to be thrown making it clear that the user should choose one way or the other.

No migration tools are required since it's a relatively minor library change.

Existing behavior (methods) could be removed at the next major release without a full year deprecation period, since KStream is InterfaceStability.Evolving.  While InterfaceStability.Evolving allows for a minor release breakage, it seems entirely unnecessary to rush the removal of those deprecated methods.

Alternatives

This section isn't called "rejected alternatives" yet like the template because I don't think they're necessarily rejected yet; they should be discussed.

Don't deprecate the existing KStream methods that take stateStoreNames

This may be desirable if users want to create StoreBuilders or specify store names far away from the Processor logic itself.  I can't think of a major reason why this would be desired - as is evidenced by this KIP, I have the opposite problem.  And I'm not sure that these changes were necessarily prevent that so much as just change how that would work.  In general I favor deprecation just to keep the API smaller and so there aren't two ways of doing things.

Have the added method on the Supplier interfaces only return store names, not builders

...