Table of Contents |
---|
Status
Current state: Under Discussion Accepted [VOTE] KIP-120: Cleanup Kafka Streams builder API
Discussion thread: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API
JIRA: KAFKA-3856
Released: 1.0.10.30
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
org.apache.kafka.streams.processor.TopologyBuilder
org.apache.kafka.streams.kstream.
KStreamBuilder
and add them again ( with same new name but and different package)
org.apache.kafka.streams.TopologyBuilderTopology
org.apache.kafka.streams.
KStreamBuilderStreamsBuilder
Furthermore, we add a new interface to get a full description the a topology (to compensate for some internal methods that get removed)
org.apache.kafka.streams.TopologyDescription
TopologyDescription
will have public interfaces:
org.apache.kafka.streams.TopologyDescription
TopologyDescription
will have public subclassesSubtopology
GlobalStores
Node
Source
Sink
Processor
The following methods will not be available in the newly added classes:
TopologyBuilder -> Topology
: setApplicationId, connectSourceStoreAndTopic, connectProcessors, addInternalTopic, copartitionSources, nodeGroups, build, buildGlobalStateTopology, globalStateStores, topicGroups, earliestResetTopicPattern, latestResetTopicPattern, stateStoreNamesToSourceTopics, copartitioneGroups, sourceTopicPattern, updateSubscriptionsKStreamBuilder -> StreamsBuilder
: all of above methods from TopologyBuilder (as KStreamBuilder does not inherit from TopologyBuilder anymore) plus newNameexcept addStateStore and addGlobalStore (which are both added explicitly toStreamsBuilder
) plus newName- methods highlighted in methods highlighted in
TopologyBuilder
list, are methods that actually belong to DSL abstraction
...
New org.apache.kafka.streams.TopologyBuilderTopology
class:
Code Block | ||
---|---|---|
| ||
public final class TopologyBuilderTopology { // make class final compared to old TopologyBuilder // existing public API from current TopologyBuilder class that will not be changed public enum AutoOffsetReset { EARLIEST , LATEST } public TopologyBuilderTopology(); public synchronized TopologyBuilderTopology addSource(String name, String... topics); public synchronized TopologyBuilderTopology addSource(String name, Pattern topicPattern); public synchronized TopologyBuilderTopology addSource(AutoOffsetReset offsetReset, String name, String... topics); public synchronized TopologyBuilderTopology addSource(AutoOffsetReset offsetReset, String name, Pattern topicPattern); public synchronized TopologyBuilderTopology addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, String... topics); public synchronized TopologyBuilderTopology addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, Pattern topicPattern); public synchronized TopologyBuilderTopology addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, String... topics); public synchronized TopologyBuilderTopology addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, Pattern topicPattern); public synchronized TopologyBuilderTopology addSink(String name, String topic, String... parentNames); public synchronized TopologyBuilderTopology addSink(String name, String topic, Serializer keySerializer, Serializer valSerializervalueSerializer, String... parentNames); public synchronized TopologyBuilderTopology addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames); public synchronized <K, V> TopologyBuilderTopology addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializervalueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames); public synchronized TopologyBuilderTopology addProcessor(String name, ProcessorSupplier supplier, String... parentNames); public synchronized TopologyBuilderTopology addStateStore(StateStoreSupplier supplier, String... processorNames); public synchronized TopologyBuilderTopology addGlobalStore(final StateStore store, String sourceName, final String sourceName, Deserializer keyDeserializer, final Deserializer keyDeserializer, Deserializer valueDeserializer, final Deserializer valueDeserializer, String topic, final String topic, final String processorName, final ProcessorSupplier stateUpdateSupplier); public synchronized TopologyBuilderTopology connectProcessorAndStateStores(String processorName, String... stateStoreNames); // newly added method to reveal topology structure // describes the current Topology, ie, TopologyDescription will not be updated if Topology is modified but #describe() must be called again public synchronized TopologyDescription describe(); } |
...
New org.apache.kafka.streams.KStreamBuilderStreamsBuilder
class:
Code Block | ||
---|---|---|
| ||
public final class KStreamBuilderStreamsBuilder { // make class final compared to old KStreamBuilder // existing public API from current KStreamBuilder class that will not be changed // small change: adding `synchronized` to all methods to align with TopologyBuilder public KStreamBuilderStreamsBuilder(); public synchronized <K, V> KStream<K, V> stream(final String... topics); public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern); public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final String... topics); public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Pattern topicPattern); public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String... topics); public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final Pattern topicPattern); public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String... topics); public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final Pattern topicPattern); public synchronized <K, V> KTable<K, V> table(final String topic, final String storeName); public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final String topic, final String storeName); public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String topic, final String storeName); public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String topic, final String storeName); public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final String storeName); public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String topic, final StringString storeName); public synchronized <K, V> KStream<K, V> merge(final KStream<K, V>... streams); // newly added method to reveal topology structure public synchronized TopologyDescriptionKStreamBuilder describe(); } |
New org.apache.kafka.streams.TopologyDescription
class:
Code Block | ||
---|---|---|
| ||
public final class TopologyDescription { public final List<Subtopology> subtopologies; public final List<GlobalStore> globalStores; public final class Subtopology { public final List<Node> nodes; } public final class GlobalStore { public final String name; public final String topic; } public interface Node { List<Node> getPredecessors(); List<Node> getSuccessors(); } public final class Source implements Node { public final String name; addStateStore(StateStoreSupplier supplier, String... processorNames); public finalsynchronized String topic; // can be topic name or pattern (as String) KStreamBuilder addGlobalStore(StateStore store, } public final class Processor implements Node { public final String name; public final List<String> stores; } public final class Sink implements NodeString {sourceName, public final String name; public final String topic; } } |
Changes to org.apache.kafka.streams.KafkaStreams
:
Code Block | ||
---|---|---|
| ||
public class KafkaStreams { // deprecating old constructors @Deprecated public KafkaStreams(final TopologyBuilderDeserializer builderkeyDeserializer, // old TopologyBuilder final Properties props); @Deprecated public KafkaStreams(final TopologyBuilder builder, // old TopologyBuilder Deserializer valueDeserializer, final StreamsConfig config); @Deprecated public KafkaStreams(final TopologyBuilder builder, // old TopologyBuilder final StreamsConfigString configtopic, final KafkaClientSupplier clientSupplier); // adding new constructors public KafkaStreams(final TopologyBuilder builder, // new TopologyBuilder String processorName, final Properties props); public KafkaStreams(final TopologyBuilder builder, // new TopologyBuilder final StreamsConfigProcessorSupplier configstateUpdateSupplier); public KafkaStreams(finalsynchronized TopologyBuilder builder, // new TopologyBuilderTopology build(); } |
New org.apache.kafka.streams.TopologyDescription
class:
Code Block | ||
---|---|---|
| ||
public interface TopologyDescription { Set<Subtopology> subtopologies(); Set<GlobalStore> globalStores(); inferface Subtopology { final StreamsConfig config,int id(); Set<Node> nodes(); } interface GlobalStore { final KafkaClientSupplier clientSupplier); publicSource KafkaStreams(final KStreamBuilder builder, // new KStreamBuilder source(); Processor processor(); } interface Node { String name() final Properties props); publicSet<Node> KafkaStreams(final KStreamBuilder builder, // new KStreamBuilder predecessors(); Set<Node> successors(); } interface Source extends Node { final StreamsConfigString configtopics(); // can be publiccomma KafkaStreams(finalseparated KStreamBuilderlist builder,of //topic newnames KStreamBuilder or pattern (as String) } interface Processor extends Node { final StreamsConfig config,Set<String> stores(); } interface Sink extends Node { String topic(); final KafkaClientSupplier clientSupplier);} } |
Proposed Changes
Refactor the code to remove internal methods from public API.
We will add two new internal classes
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
org.apache.kafka.streams.kstream.internals.InternalKStreamBuilder InternalStreamsBuilder
that offer the methods remove from current API. Thus, both classes are the actual implementation. Old TopologyBuilder
and KStreamBuilder
are only proxy classes to both classes respectively, for backward compatibility.
The newly added TopologyBuilder
uses Topology
uses InternalTopologyBuilder
as member.
The newly added KStreamBuilder
used the new TopologyBuilder
StreamsBuilder
uses the new Topology
as a member (no class hierarchy anymore – using it as member gives a clear separation between PAPI and DSL).
Because the new KStreamBuilder
StreamsBuilder
does not inherit from new TopologyBuilder
we need to add some more constructors to KafkaStreams
. Topology
we need to add StreamsBuilder#build()
that returns the actual Topology
to be passed into KafkaStreams
client.
Note: because of backward compatibility, removed DSL specific classes offered by old TopologyBuilder
must be offered by InternalTopologyBuilder
for now. However, after both deprecated classes got removed, this cleanup can be done (and does not require a KIP anymore, because it's internal refactoring -- we just need to create a JIRA for this). Thus, this KIP falls short of separating PAPI and DSL completely. But it's a necessary first step to do the separation in a backward compatible way (backward compatibility requires a two step approach).
...
- Because no classes/method will be removed but only deprecated, this change will be fully backward compatible
- We intend to remove all deprecated classes/methods in 0.1011.41, but we can keep them longer on user request
Test Plan
Test Tests need to be rewritten but no new tests are required. All tests for public API need to be updated to use new TopologyBuilder
and KStreamBuilder
, and thus will not call deprecated KafkaStreams
constructor anymoreTopology
and StreamsBuilder
. All tests using internal API, need to be rewritten to use InternalTopologyBuilder
and InternalKStreamBuilder
InternalStreamsBuilder
.
Rejected Alternatives
None.
...