...
JIRA: KAFKA-3856
Released: 1.0.11.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Furthermore, we add a new class interface to get a full description the a topology (to compensate for some internal methods that get removed)
org.apache.kafka.streams.TopologyDescription
TopologyDescription
will have public subclassesinterfaces:Subtopology
GlobalStores
Node
Source
Sink
Processor
The following methods will not be available in the newly added classes:
...
Code Block | ||
---|---|---|
| ||
public class Topology { // existing public API from current TopologyBuilder class that will not be changed public enum AutoOffsetReset { EARLIEST , LATEST } public Topology(); public synchronized Topology addSource(String name, String... topics); public synchronized Topology addSource(String name, Pattern topicPattern); public synchronized Topology addSource(AutoOffsetReset offsetReset, String name, String... topics); public synchronized Topology addSource(AutoOffsetReset offsetReset, String name, Pattern topicPattern); public synchronized Topology addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, String... topics); public synchronized Topology addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, Pattern topicPattern); public synchronized Topology addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, String... topics); public synchronized Topology addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, Pattern topicPattern); public synchronized Topology addSink(String name, String topic, String... parentNames); public synchronized Topology addSink(String name, String topic, Serializer keySerializer, Serializer valSerializervalueSerializer, String... parentNames); public synchronized Topology addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames); public synchronized <K, V> Topology addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializervalueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames); public synchronized Topology addProcessor(String name, ProcessorSupplier supplier, String... parentNames); public synchronized Topology addStateStore(StateStoreSupplier supplier, String... processorNames); public synchronized Topology addGlobalStore(final StateStore store, final String sourceName, final Deserializer keyDeserializer, final Deserializer valueDeserializer, final String topic, final String processorName, final ProcessorSupplier stateUpdateSupplier); public synchronized Topology connectProcessorAndStateStores(String processorName, String... stateStoreNames); // newly added method to reveal topology structure // describes the current Topology, ie, TopologyDescription will not be updated if Topology is modified but #describe() must be called again public synchronized TopologyDescription describe(); } |
...
Code Block | ||
---|---|---|
| ||
public class StreamsBuilder { // existing public API from current KStreamBuilder class that will not be changed // small change: adding `synchronized` to all methods to align with TopologyBuilder public StreamsBuilder(); public synchronized <K, V> KStream<K, V> stream(final String... topics); public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern); public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final String... topics); public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Pattern topicPattern); public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String... topics); public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final Pattern topicPattern); public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String... topics); public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final Pattern topicPattern); public synchronized <K, V> KTable<K, V> table(final String topic, final String storeName); public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final String topic, final String storeName); public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String topic, final String storeName); public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final Serde<K>Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String topic, final String storeName); public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final String storeName); public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String topic, final String storeName); public synchronized <K, V> KStream<K, V> merge(final KStream<K, V>... streams); // newly added method public synchronized KStreamBuilder addStateStore(StateStoreSupplier supplier, String... processorNames); public synchronized KStreamBuilder addGlobalStore(final StateStore store, final String sourceName, final Deserializer keyDeserializer, final Deserializer valueDeserializer, final String topic, final String processorName, final ProcessorSupplier stateUpdateSupplier); public synchronized Topology build(); } |
...
Code Block | ||
---|---|---|
| ||
public final classinterface TopologyDescription { public final List<Subtopology>Set<Subtopology> subtopologies(); public final List<GlobalStore> globalStoresSet<GlobalStore> globalStores(); publicinferface final class Subtopology { public final int id(); public final List<Node>Set<Node> nodes(); } publicinterface final class GlobalStore { public final String nameSource source(); public final String topicProcessor processor(); } public interface Node { List<Node>String getPredecessorsname(); List<Node>Set<Node> getSuccessorspredecessors(); } public final class Source implements Node {Set<Node> successors(); } interface Source extends public final String name;Node { String topics(); // can be comma separated list of topic names or pattern (as String) } publicinterface final class Processor implementsextends Node { public final String name; public final List<String> storesSet<String> stores(); } public final classinterface Sink implementsextends Node { public final String name; public final String topic(); } } |
Proposed Changes
We will add two new internal classes
...