THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
public class Topology { // existing public API from current TopologyBuilder class that will not be changed public enum AutoOffsetReset { EARLIEST , LATEST } public Topology(); public synchronized Topology addSource(String name, String... topics); public synchronized Topology addSource(String name, Pattern topicPattern); public synchronized Topology addSource(AutoOffsetReset offsetReset, String name, String... topics); public synchronized Topology addSource(AutoOffsetReset offsetReset, String name, Pattern topicPattern); public synchronized Topology addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, String... topics); public synchronized Topology addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, Pattern topicPattern); public synchronized Topology addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, String... topics); public synchronized Topology addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, Pattern topicPattern); public synchronized Topology addSink(String name, String topic, String... parentNames); public synchronized Topology addSink(String name, String topic, Serializer keySerializer, Serializer valSerializervalueSerializer, String... parentNames); public synchronized Topology addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames); public synchronized <K, V> Topology addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializervalueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames); public synchronized Topology addProcessor(String name, ProcessorSupplier supplier, String... parentNames); public synchronized Topology addStateStore(StateStoreSupplier supplier, String... processorNames); public synchronized Topology addGlobalStore(final StateStore store, final String sourceName, final Deserializer keyDeserializer, final Deserializer valueDeserializer, final String topic, final String processorName, final ProcessorSupplier stateUpdateSupplier); public synchronized Topology connectProcessorAndStateStores(String processorName, String... stateStoreNames); // newly added method to reveal topology structure // describes the current Topology, ie, TopologyDescription will not be updated if Topology is modified but #describe() must be called again public synchronized TopologyDescription describe(); } |
...
Code Block | ||
---|---|---|
| ||
public class StreamsBuilder { // existing public API from current KStreamBuilder class that will not be changed // small change: adding `synchronized` to all methods to align with TopologyBuilder public StreamsBuilder(); public synchronized <K, V> KStream<K, V> stream(final String... topics); public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern); public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final String... topics); public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Pattern topicPattern); public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String... topics); public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final PatternPattern topicPattern); public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String... topics); public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final Pattern topicPattern); public synchronized <K, V> KTable<K, V> table(final String topic, final String storeName); public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final String topic, final String storeName); public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String topic, final String storeName); public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String topic, final String storeName); public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final String storeName); public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String topic, final String storeName); public synchronized <K, V> KStream<K, V> merge(final KStream<K, V>... streams); // newly added method public synchronized KStreamBuilder addStateStore(StateStoreSupplier supplier, String... processorNames); public synchronized KStreamBuilder addGlobalStore(final StateStore store, final String sourceName, final Deserializer keyDeserializer, final Deserializer valueDeserializer, final String topic, final String processorName, final ProcessorSupplier stateUpdateSupplier); public synchronized Topology build(); } |
...
Code Block | ||
---|---|---|
| ||
public final class TopologyDescription {
public final List<Subtopology> subtopologies;
public final List<GlobalStore> globalStores;
public final class Subtopology {
public final int id;
public final List<Node> nodes;
}
public final class GlobalStore {
public final String name;
public final String topic;
}
public interface Node {
List<Node> getPredecessors();
List<Node> getSuccessors();
}
public final class Source implements Node {
public final String name;
public final String topics; // can be comma separated list of topic names or pattern (as String)
}
public final class Processor implements Node {
public final String name;
public final List<String> stores;
}
public final class Sink implements Node {
public final String name;
public final String topic;
}
} |
...