Table of Contents |
---|
Status
Current state: Under Discussion Accepted [VOTE] KIP-120: Cleanup Kafka Streams builder API
Discussion thread: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API
JIRA: KAFKA-3856
Released: 1.0.11.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
org.apache.kafka.streams.Topology
org.apache.kafka.streams.
StreamsTopologyBuilderStreamsBuilder
Furthermore, we add a new class interface to get a full description the a topology (to compensate for some internal methods that get removed)
org.apache.kafka.streams.TopologyDescription
TopologyDescription
will have public subclassesinterfaces:Subtopology
GlobalStores
Node
Source
Sink
Processor
The following methods will not be available in the newly added classes:
TopologyBuilder -> Topology
: setApplicationId, connectSourceStoreAndTopic, connectProcessors, addInternalTopic, copartitionSources, nodeGroups, build, buildGlobalStateTopology, globalStateStores, topicGroups, earliestResetTopicPattern, latestResetTopicPattern, stateStoreNamesToSourceTopics, copartitioneGroups, sourceTopicPattern, updateSubscriptionsKStreamBuilder -> StreamsTopologyBuilderStreamsBuilder
: all methods from TopologyBuilder (as KStreamBuilder does not inherit from TopologyBuilder anymore) except addStateStore and addGlobalStore (which are both added explicitly toStreamsTopologyBuilder
StreamsBuilder
) plus newName- methods highlighted in
TopologyBuilder
list, are methods that actually belong to DSL abstraction
...
Code Block | ||
---|---|---|
| ||
public class Topology { // existing public API from current TopologyBuilder class that will not be changed public enum AutoOffsetReset { EARLIEST , LATEST } public Topology(); public synchronized Topology addSource(String name, String... topics); public synchronized Topology addSource(String name, Pattern topicPattern); public synchronized Topology addSource(AutoOffsetReset offsetReset, String name, String... topics); public synchronized Topology addSource(AutoOffsetReset offsetReset, String name, Pattern topicPattern); public synchronized Topology addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, String... topics); public synchronized Topology addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, Pattern topicPattern); public synchronized Topology addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, String... topics); public synchronized Topology addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, Pattern topicPattern); public synchronized Topology addSink(String name, String topic, String... parentNames); public synchronized Topology addSink(String name, String topic, Serializer keySerializer, Serializer valSerializervalueSerializer, String... parentNames); public synchronized Topology addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames); public synchronized <K, V> Topology addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializervalueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames); public synchronized Topology addProcessor(String name, ProcessorSupplier supplier, String... parentNames); public synchronized Topology addStateStore(StateStoreSupplier supplier, String... processorNames); public synchronized Topology addGlobalStore(final StateStore store, final String sourceName, final Deserializer keyDeserializer, final Deserializer valueDeserializer, final String topic, final String processorName, final ProcessorSupplier stateUpdateSupplier); public synchronized Topology connectProcessorAndStateStores(String processorName, String... stateStoreNames); // newly added method to reveal topology structure // describes the current Topology, ie, TopologyDescription will not be updated if Topology is modified but #describe() must be called again public synchronized TopologyDescription describe(); } |
...
New org.apache.kafka.streams.StreamTopologyBuilderStreamsBuilder
class:
Code Block | ||
---|---|---|
| ||
public class StreamsTopologyBuilderStreamsBuilder { // existing public API from current KStreamBuilder class that will not be changed // small change: adding `synchronized` to all methods to align with TopologyBuilder public StreamsTopologyBuilderStreamsBuilder(); public synchronized <K, V> KStream<K, V> stream(final String... topics); public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern); public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final String... topics); public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Pattern topicPattern); public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String... topics); public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final Pattern topicPattern); public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String... topics); public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, finalSerde<V> Serde<V> valSerdevalueSerde, final Pattern topicPattern); public synchronized <K, V> KTable<K, V> table(final String topic, final String storeName); public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final String topic, final String storeName); public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String topic, final String storeName); public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String topic, final String storeName); public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final String storeName); public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String topic, final String storeName); public synchronized <K, V> KStream<K, V> merge(final KStream<K, V>... streams); // newly added method public synchronized KStreamBuilder addStateStore(StateStoreSupplier supplier, String... processorNames); public synchronized KStreamBuilder addGlobalStore(final StateStore store, final String sourceName, final Deserializer keyDeserializer, final Deserializer valueDeserializer, final String topic, final String processorName, final ProcessorSupplier stateUpdateSupplier); public synchronized Topology build(); } |
...
Code Block | ||
---|---|---|
| ||
public finalinterface class TopologyDescription { public final List<Subtopology>Set<Subtopology> subtopologies(); publicSet<GlobalStore> final List<GlobalStore> globalStores(); public final classinferface Subtopology { int id(); public final List<Node>Set<Node> nodes(); } public final classinterface GlobalStore { public final String nameSource source(); public final String topicProcessor processor(); } public interface Node { List<Node>String getPredecessorsname(); List<Node>Set<Node> getSuccessorspredecessors(); } public final class Source implements Node {Set<Node> successors(); } interface Source extends Node public{ final String name; String topics(); // topicNamescan andbe topicPatterncomma areseparated mutuallylist exclusive, i.e., only one will be not-nullof topic names or pattern (as String) } public final List<String> topicNames; // null if #addSource(..., Pattern) was used public final Pattern topicPattern; // null if #addSource(..., String...) was used } public final class Processor implements Node { public final String name; interface Processor extends Node { public final List<String>Set<String> stores(); } public finalinterface class Sink implementsextends Node { public final String name; public final String topictopic(); } } |
Proposed Changes
We will add two new internal classes
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
org.apache.kafka.streams.kstream.internals.InternalStreamsTopologyBuilder InternalStreamsBuilder
that offer the methods remove from current API. Thus, both classes are the actual implementation. Old TopologyBuilder
and KStreamBuilder
are only proxy classes to both classes respectively, for backward compatibility.
The newly added Topology
uses InternalTopologyBuilder
as member.
The newly added StreamsTopologyBuilder
StreamsBuilder
uses the new Topology
as a member (no class hierarchy anymore – using it as member gives a clear separation between PAPI and DSL).
Because the new StreamsTopologyBuilder
StreamsBuilder
does not inherit from new Topology
we need to add StreamsTopologyBuilder#buildStreamsBuilder#build()
that returns the actual Topology
to be passed into KafkaStreams
client.
...
Tests need to be rewritten but no new tests are required. All tests for public API need to be updated to use new Topology
and StreamsTopologyBuilder
StreamsBuilder
. All tests using internal API, need to be rewritten to use InternalTopologyBuilder
and InternalStreamsTopologyBuilder
InternalStreamsBuilder
.
Rejected Alternatives
None.
...