THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
JIRA: KAFKA-3856
Released: 1.0.11.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Furthermore, we add a new class interface to get a full description the a topology (to compensate for some internal methods that get removed)
org.apache.kafka.streams.TopologyDescription
TopologyDescription
will have public subclassesinterfaces:Subtopology
GlobalStores
Node
Source
Sink
Processor
The following methods will not be available in the newly added classes:
...
Code Block | ||
---|---|---|
| ||
public class StreamsBuilder { // existing public API from current KStreamBuilder class that will not be changed // small change: adding `synchronized` to all methods to align with TopologyBuilder public StreamsBuilder(); public synchronized <K, V> KStream<K, V> stream(String... topics); public synchronized <K, V> KStream<K, V> stream(Pattern topicPattern); public synchronized <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, String... topics); public synchronized <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, Pattern topicPattern); public synchronized <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valueSerde, String... topics); public synchronized <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valueSerde, Pattern topicPattern); public synchronized <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, Serde<K> keySerde, Serde<V> valueSerde, String... topics); public synchronized <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, Serde<K> keySerde, Serde<V> valueSerde, Pattern topicPattern); public synchronized <K, V> KTable<K, V> table(String topic, String storeName); public synchronized <K, V> KTable<K, V> table(AutoOffsetReset offsetReset, String topic, String storeName); public synchronized <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V> valueSerde, String topic, String storeName); public synchronized <K, V> KTable<K, V> table (AutoOffsetReset offsetReset, Serde<K> keySerde, Serde<V> valueSerde, String topic, String storeName); public synchronized <K, V> GlobalKTable<K, V> globalTable String topic, String storeName); public synchronized <K, V> GlobalKTable<K, V> globalTable Serde<K> keySerde, Serde<V> valueSerde, String topic, String storeName); public synchronized <K, V> KStream<K, V> merge KStream<K, V>... streams); // newly added method public synchronized KStreamBuilder addStateStore(StateStoreSupplier supplier, String... processorNames); public synchronized KStreamBuilder addGlobalStore(StateStore store, String sourceName, Deserializer keyDeserializer, Deserializer valueDeserializer, String topic, String processorName, ProcessorSupplier stateUpdateSupplier); public synchronized Topology build(); } |
...
Code Block | ||
---|---|---|
| ||
public final classinterface TopologyDescription { public final Set<Subtopology> subtopologies(); public final Set<GlobalStore> globalStores(); public final classinferface Subtopology { public final int id(); public final Set<Node> nodes(); } public final classinterface GlobalStore { public final Source source(); public final Processor processor(); } public interface Node { String name() Set<Node> getPredecessorspredecessors(); Set<Node> getSuccessorssuccessors(); } public final classinterface Source implementsextends Node { public final String topics(); // can be comma separated list of topic names or pattern (as String) } public final classinterface Processor implementsextends Node { public final Set<String> stores(); } public final classinterface Sink implementsextends Node { public final String topic(); } } |
Proposed Changes
...