Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public class Topology {

    // existing public API from current TopologyBuilder class that will not be changed

    public enum AutoOffsetReset {
        EARLIEST , LATEST
    }

    public Topology();


    public synchronized Topology addSource(String name, String... topics);
    public synchronized Topology addSource(String name, Pattern topicPattern);

    public synchronized Topology addSource(AutoOffsetReset offsetReset, String name,  String... topics);
    public synchronized Topology addSource(AutoOffsetReset offsetReset, String name,  Pattern topicPattern);

    public synchronized Topology addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics);
    public synchronized Topology addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern);
   
    public synchronized Topology addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics);
    public synchronized Topology addSource(AutoOffsetReset offsetReset, String name,  Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern;


    public synchronized Topology addSink(String name, String topic, String... parentNames);
    public synchronized Topology addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames);
 
    public synchronized Topology addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames);
    public synchronized <K, V> Topology addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames);


    public synchronized Topology addProcessor(String name, ProcessorSupplier supplier, String... parentNames);
 
    public synchronized Topology addStateStore(StateStoreSupplier supplier, String... processorNames);
 
    public synchronized Topology addGlobalStore(final StateStore store,
                                                final String sourceName,
                                                final Deserializer keyDeserializer,
                                                final Deserializer valueDeserializer,
                                                final String topic,
                                                final String processorName,
                                                final ProcessorSupplier stateUpdateSupplier);

    public synchronized Topology connectProcessorAndStateStores(String processorName, String... stateStoreNames);



    // newly added method to reveal topology structure
    
    // describes the current Topology, ie, TopologyDescription will not be updated if Topology is modified but #describe() must be called again
    public synchronized TopologyDescription describe();
}

...

Code Block
languagejava
public final class TopologyDescription {
    public final List<Subtopology> subtopologies;
    public final List<GlobalStore> globalStores;

    public final class Subtopology {
        public final List<Node> nodes;
    }

    public final class GlobalStore {
        public final String name;
        public final String topic;
    }

    public interface Node {
        List<Node> getPredecessors();
        List<Node> getSuccessors();
    }

    public final class Source implements Node {
        public final String name;
        public// finaltopicNames String topicand topicPattern are mutually exclusive, i.e., only one will be not-null
        public final List<String> topicNames; // can be topic name or pattern (as String) null if #addSource(..., Pattern) was used
        public final Pattern topicPattern; // null if #addSource(..., String...) was used
    }

    public final class Processor implements Node {
        public final String name;
        public final List<String> stores;
    }

    public final class Sink implements Node {
        public final String name;
        public final String topic;
    }

}

...