Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA: KAFKA-3856

Released: X1.X0.X0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Furthermore, we add a new class interface to get a full description the a topology (to compensate for some internal methods that get removed)

  • org.apache.kafka.streams.TopologyDescription

    • TopologyDescription will have public subclassesinterfaces:

      • Subtopology
      • GlobalStores
      • Node
       (interface)
      • Source
      • Sink
      • Processor

The following methods will not be available in the newly added classes:

...

Code Block
languagejava
public class StreamsBuilder {

    // existing public API from current KStreamBuilder class that will not be changed
    // small change: adding `synchronized` to all methods to align with TopologyBuilder

    public StreamsBuilder();


    public synchronized <K, V> KStream<K, V> stream(String... topics);
    public synchronized <K, V> KStream<K, V> stream(Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, String... topics);
    public synchronized <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valueSerde, String... topics);
    public synchronized <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valueSerde, Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, Serde<K> keySerde, Serde<V> valueSerde, String... topics);
    public synchronized <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, Serde<K> keySerde, Serde<V> valueSerde, Pattern topicPattern);


    public synchronized <K, V> KTable<K, V> table(String topic, String storeName);
    public synchronized <K, V> KTable<K, V> table(AutoOffsetReset offsetReset, String topic, String storeName);
    public synchronized <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V> valueSerde, String topic, String storeName);
    public synchronized <K, V> KTable<K, V> table (AutoOffsetReset offsetReset, Serde<K> keySerde, Serde<V> valueSerde, String topic, String storeName);


    public synchronized <K, V> GlobalKTable<K, V> globalTable String topic, String storeName);
    public synchronized <K, V> GlobalKTable<K, V> globalTable Serde<K> keySerde, Serde<V> valueSerde, String topic, String storeName);

    public synchronized <K, V> KStream<K, V> merge KStream<K, V>... streams);



    // newly added method

    public synchronized KStreamBuilder addStateStore(StateStoreSupplier supplier, String... processorNames);
    public synchronized KStreamBuilder addGlobalStore(StateStore store,
                                                      String sourceName,
                                                      Deserializer keyDeserializer,
                                                      Deserializer valueDeserializer,
                                                      String topic,
                                                      String processorName,
                                                      ProcessorSupplier stateUpdateSupplier);

    public synchronized Topology build();
}

...

Code Block
languagejava
public final classinterface TopologyDescription {
    public final Set<Subtopology> subtopologies();
    public final Set<GlobalStore> globalStores();

    publicinferface final class Subtopology {
        public final int id();
        public final Set<Node> nodes();
    }

    public final classinterface GlobalStore {
        public final Source source();
        public final Processor processor();
    }

    public interface Node {
        String name()
        Set<Node> getPredecessorspredecessors();
        Set<Node> getSuccessorssuccessors();
    }

    public final classinterface Source implementsextends Node {
        public final String topics(); // can be comma separated list of topic names or pattern (as String)
    }

    public final classinterface Processor implementsextends Node {
        public final Set<String> stores();
    }

    public final classinterface Sink implementsextends Node {
        public final String topic();
    }

}

 

Proposed Changes

...