Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Accepted

Discussion threadhttps://lists.apache.org/thread.html/600996d83d485f2b8daf45037de64a60cebdfac9b234bf3449b6b753@%3Cdev.kafka.apache.org%3E

JIRA: https://issues.apache.org/jira/browse/

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7523

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When writing low-level Processors and Transformers that are stateful using kafka streams, often the processors (or transformers, I'll use "processors" to refer to both for brevity) want to "own" one or more state stores, the details of which are not important to the business logic of the application.  However, when incorporating these into a topology defined by the high level DSL, using KStream:process, you're forced to specify the state store names so the topology is wired up correctly.  This creates a clumsy pattern where the "owned" state store's name must be passed alongside the TransformerSupplier, when the supplier itself could just as easily supply that information on their own.

...

Code Block
builder.stream("input.topic")
        .map(...)
        .filter(...)
        .process(MyStatefulProcessorSupplier.make(val -> businessLogic(val)));


This allows for the same level "reads top to bottom" type of clarity as when using Processors (and Transformers) as when using the high-level DSL.

Public Interfaces

KStream

Deprecate methods that take String.. stateStoreNames and replace them with methods that don't:

Add an interface ConnectedStoreProvider that allows the implementor to specify state stores that should be connected to this processor/transformer (defaulting to no stores).

ConnectedStoreProvider

Code Block
public interface ConnectedStoreProvider {
Code Block
@Deprecated
<K1, V1> NewKStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
    default                                  final String... stateStoreNames);

<K1, V1> NewKStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier);

@Deprecated
<VR> NewKStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,Set<StoreBuilder> stores() {
                                       final String... stateStoreNames);

<VR> NewKStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier);

@Deprecated
<VR> NewKStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
                                       final String... stateStoreNames);

<VR> NewKStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier);

@Deprecated
void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
             final String... stateStoreNames);

void process(final ProcessorSupplier<? super K, ? super V> processorSupplier);return null;
    }
}


Change all Processor/TransformerSupplier  interfaces to extend from it:Add method to supplier interfaces allowing them to specify state stores they own and how to build them. 


TransformerSupplier

Code Block
public interface TransformerSupplier<K, V, R> {
    Transformer<K, V, R> get();
    default List<StoreBuilder> stateStores() extends ConnectedStoreProvider {
        return Collections.emptyList();
    }...
}

ValueTransformerSupplier

Code Block
public interface ValueTransformerSupplier<V, VR> {
extends    ValueTransformer<V, VR> get();
    default List<StoreBuilder> stateStores() ConnectedStoreProvider {
        return Collections.emptyList();
    }
...
}

ValueTransformerWithKeySupplier

Code Block
public interface ValueTransformerWithKeySupplier<K, V, VR> {
    ValueTransformerWithKey<K, V, VR> get();
    default List<StoreBuilder> stateStores() extends ConnectedStoreProvider {
        return Collections.emptyList();
    }...
}

ProcessorSupplier

Code Block
public interface ProcessorSupplier<K, V> {
    Processor<K, V> get();
    default List<StoreBuilder> stateStores() extends ConnectedStoreProvider {
        return Collections.emptyList();
    }
}...
}


Proposed Changes

The proposal is to change enhance the ProcessorSupplier and TransformerSupplier interfaces by allowing them to provide information about what state stores they own, eliminating (and deprecating) the need to pass in state store names "own" when constructing a topology using StreamsBuilderKStream::process, KStream::transform, and  KStream::transformValues, and Topology::addProcessor.

The public interface changes above directly imply what needs to be changed in KStream:  The process etc methods would get state store names from the list of StoreBuilders that the supplier provides(which implements ConnectedStoreProvider ) provides, rather than the var args stateStoreNames.  Additionally, the 

The process method would add the StoreBuilders to the topology using builder.addStateStore() and connect the store to that processor, rather than requiring the user to do it themselves.  In order to solve the problem of addStateStore potentially being called twice for the same store (because more than one Supplier specifies it), the check for duplicate stores in addStateStores will be relaxed to allow for duplicates if the same StoreBuilder instance for the same store name (compared by referenced, not equals() ).

Compatibility, Deprecation, and Migration Plan

Deprecated methods will continue to behave as they previously have.  Because the added interface methods are default with a reasonable default, those additions are backwards compatible.

A user may continue to "connect" stores to a processor by passing stateStoreNames when calling stream.  Ideally, the deprecated methods would throw an exception if state stores are provided via *Supplier::stateStores and the var args stateStoreNames, to reduce confusionprocess/transform(...) .  This may be used in combination with a Supplier  that provides its own state stores by implementing ConnectedStoreProvider::stores() .

If a StoreBuilder  that was manually added is also returned by a ConnectedStoreProvider , there is no issue since adding a state store will now be idempotent.

No migration tools are required since it's a relatively minor library change.Existing behavior (methods) could be removed at the next major release without a full year deprecation period, since KStream is InterfaceStability.Evolving.  While InterfaceStability.Evolving allows for a minor release breakage, it seems entirely unnecessary to rush the removal of those deprecated methods.

Alternatives

...

This section isn't called "rejected alternatives" yet like the template because I don't think they're necessarily rejected yet; they should be discussed.

Don't deprecate the existing KStream methods that take stateStoreNames

...

Have the added method on the Supplier interfaces only return store names, not builders

This solves the original issue only partially, but with perhaps less "API risk."  The String... stateStoreNames argument would no longer be needed on the KStream methods, but the user would still need to manually add the StoreBuilders to the Topology.  The downside is we don't achieve the full reduction of "wiring up" code required when building the topology (the user still needs to know to call topology.addStateStore()), but the upside is that the StoreBuilder is less coupled to the *Supplier.  I don't consider this upside significant, but perhaps there are other use cases I'm not considering.

Do nothing

This is a "quality of life" API improvement, and nothing more, so maybe it's unneeded churn.  I favor doing something (obviously) because I think that while small, this change can be a major usability improvement for the low-level API.

...