Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA: KAFKA-3856

Released: 1.0.11.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Furthermore, we add a new class interface to get a full description the a topology (to compensate for some internal methods that get removed)

  • org.apache.kafka.streams.TopologyDescription

    • TopologyDescription will have public subclassesinterfaces:

      • Subtopology
      • GlobalStores
      • Node
       (interface)
      • Source
      • Sink
      • Processor

The following methods will not be available in the newly added classes:

...

Code Block
languagejava
public class Topology {

    // existing public API from current TopologyBuilder class that will not be changed

    public enum AutoOffsetReset {
        EARLIEST , LATEST
    }

    public Topology();


    public synchronized Topology addSource(String name, String... topics);
    public synchronized Topology addSource(String name, Pattern topicPattern);

    public synchronized Topology addSource(AutoOffsetReset offsetReset, String name,  String... topics);
    public synchronized Topology addSource(AutoOffsetReset offsetReset, String name,  Pattern topicPattern);

    public synchronized Topology addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, String... topics);
    public synchronized Topology addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, Pattern topicPattern);
   
    public synchronized Topology addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, String... topics);
    public synchronized Topology addSource(AutoOffsetReset offsetReset, String name,  Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, Pattern topicPattern);


    public synchronized Topology addSink(String name, String topic, String... parentNames);
    public synchronized Topology addSink(String name, String topic, Serializer keySerializer, Serializer valSerializervalueSerializer, String... parentNames);
 
    public synchronized Topology addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames);
    public synchronized <K, V> Topology addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializervalueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames);


    public synchronized Topology addProcessor(String name, ProcessorSupplier supplier, String... parentNames);
 
    public synchronized Topology addStateStore(StateStoreSupplier supplier, String... processorNames);
 
    public synchronized Topology addGlobalStore(final StateStore store,
                                                final String sourceName,
                                                final Deserializer keyDeserializer,
                                                final Deserializer valueDeserializer,
                                                final String topic,
                                                final String processorName,
                                                final ProcessorSupplier stateUpdateSupplier);

    public synchronized Topology connectProcessorAndStateStores(String processorName, String... stateStoreNames);



    // newly added method to reveal topology structure
    
    // describes the current Topology, ie, TopologyDescription will not be updated if Topology is modified but #describe() must be called again
    public synchronized TopologyDescription describe();
}

...

Code Block
languagejava
public class StreamsBuilder {

    // existing public API from current KStreamBuilder class that will not be changed
    // small change: adding `synchronized` to all methods to align with TopologyBuilder

    public StreamsBuilder();


    public synchronized <K, V> KStream<K, V> stream(final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final Pattern topicPattern);


    public synchronized <K, V> KTable<K, V> table(final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final Serde<K>Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String topic, final String storeName);


    public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final String storeName);
    public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String topic, final String storeName);

    public synchronized <K, V> KStream<K, V> merge(final KStream<K, V>... streams);



    // newly added method

    public synchronized KStreamBuilder addStateStore(StateStoreSupplier supplier, String... processorNames);
    public synchronized KStreamBuilder addGlobalStore(final StateStore store,
                                                      final String sourceName,
                                                      final Deserializer keyDeserializer,
                                                      final Deserializer valueDeserializer,
                                                      final String topic,
                                                      final String processorName,
                                                      final ProcessorSupplier stateUpdateSupplier);

    public synchronized Topology build();
}

...

Code Block
languagejava
public final classinterface TopologyDescription {
    public final List<Subtopology>Set<Subtopology> subtopologies();
    public final List<GlobalStore> globalStoresSet<GlobalStore> globalStores();

    publicinferface final class Subtopology {
        public final int id();
        public final List<Node>Set<Node> nodes();
    }

    publicinterface final class GlobalStore {
        public final String nameSource source();
        public final String topicProcessor processor();
    }

    public interface Node {
        List<Node>String getPredecessorsname();
        List<Node>Set<Node> getSuccessorspredecessors();
    }

    public final class Source implements Node {Set<Node> successors();
    }

    interface Source extends  public final String name;Node {
        String topics(); // can be comma separated list of topic names or pattern (as String)
    }

    publicinterface final class Processor implementsextends Node {
        public final String name;
        public final List<String> storesSet<String> stores();
    }

    public final classinterface Sink implementsextends Node {
        public final String name;
        public final String topic();
    }

}

 

Proposed Changes

We will add two new internal classes

...