Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

...

  • org.apache.kafka.streams.TopologyBuilder
  • org.apache.kafka.streams.KStreamBuilder

...

  • org.apache.kafka.streams.TopologyDescription
    • TopologyDescription will have public subclasses

    • Subtopology
    • GlobalStores
    • Node (interface)
    • Source
    • Sink
    • Processor

The following methods will not be available in the newly added classes:

...

Code Block
languagejava
public final class TopologyBuilder { // make class final compared to old TopologyBuilder

    // existing public API from current TopologyBuilder class that will not be changed

    public enum AutoOffsetReset {
        EARLIEST , LATEST
    }

    public TopologyBuilder();


    public synchronized TopologyBuilder addSource(String name, String... topics);
    public synchronized TopologyBuilder addSource(String name, Pattern topicPattern);

    public synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name,  String... topics);
    public synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name,  Pattern topicPattern);

    public synchronized TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics);
    public synchronized TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern);
   
    public synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics);
    public synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name,  Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern;


    public synchronized TopologyBuilder addSink(String name, String topic, String... parentNames);
    public synchronized TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames);
 
    public synchronized TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames);
    public synchronized <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames);


    public synchronized TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames);
 
    public synchronized TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames);
 
    public synchronized TopologyBuilder addGlobalStore(final StateStore store,
                                                       final String sourceName,
                                                       final Deserializer keyDeserializer,
                                                       final Deserializer valueDeserializer,
                                                       final String topic,
                                                       final String processorName,
                                                       final ProcessorSupplier stateUpdateSupplier);

    public synchronized TopologyBuilder connectProcessorAndStateStores(String processorName, String... stateStoreNames);



    // newly added method to supportreveal builder pattern
 
    // should be used instead of KafkaStreams() constructortopology structure
    // old:
    //     TopologyBuilder builder = new TopologyBuilder();
    //     KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(...));
    // new:
    //     TopologyBuilder builder = new TopologyBuilder();
    //     KafkaStreams streams = builder.build(new StreamsConfig(...));
 
    public synchronized KafkaStreamsTopologyDescription builddescribe(final StreamsConfig config);
    public synchronized KafkaStreams build(final StreamsConfig config, final KafkaClientSupplier clientSupplier);
}

 

New org.apache.kafka.streams.KStreamBuilder class:

Code Block
languagejava
public final class KStreamBuilder { // make class final compared to old KStreamBuilder

    // existing public API from current KStreamBuilder class that will not be changed
    // small change: adding `synchronized` to all methods to align with TopologyBuilder

    public KStreamBuilder();
 

    public synchronized <K, V> KStream<K, V> stream(final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerde, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerde, final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final Pattern topicPattern);
 

    public synchronized <K, V> KTable<K, V> table(final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName);


    public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final String storeName);
    public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName);

    public synchronized <K, V> KStream<K, V> merge(final KStream<K, V>... streams);



    // newly added method to supportreveal buildertopology patternstructure
 
   
 // should be usedpublic insteadsynchronized ofTopologyDescription KafkaStreams() constructor
    // old:
    //describe();
 
}

 

New org.apache.kafka.streams.TopologyDescription class:

Code Block
languagejava
public final class TopologyDescription {
    public final List<Subtopology> subtopologies;
    public final List<GlobalStore> globalStores;
 
    public final class Subtopology {
        public final List<Node> nodes;
    }
 
    public final class GlobalStore {
        public final String name;
       KStreamBuilder builderpublic =final newString KStreamBuilder();topic;
    }
 
    //public interface Node {
     KafkaStreams streams = newList<Node> KafkaStreams(builder, new StreamsConfig(...));
    // new:
    //getPredecessors();
        List<Node> getSuccessors();
    }
 
    public final class Source implements Node {
       KStreamBuilder builderpublic =final newString KStreamBuilder()name;
        public final String topic; // can be topic name KafkaStreamsor streams = builder.build(new StreamsConfig(...));
 
pattern (as String)
    }
 
    public synchronized KafkaStreams build(final StreamsConfig config);
	public synchronized KafkaStreams build(final StreamsConfig config, final KafkaClientSupplier clientSupplier);

    // return the internally used TopologyBuilder
    public synchronized TopologyBuilder topologyBuilder();
 
}
final class Processor implements Node {
        public final String name;
        public final List<String> stores;
    }
 
    public final class Sink implements Node {
        public final String name;
        public final String topic;
    }
    
}

 

Proposed Changes

Refactor the code to remove internal methods from public API.

...

Test need to be rewritten but no new tests are required. All tests for public API need to be updated to use new TopologyBuilder and KStreamBuilder, and thus will not call deprecated KafkaStreams constructor anymore. All tests using internal API, need to be rewritten to use InternalTopologyBuilder and InternalKStreamBuilder.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other wayNone.