Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public final class TopologyBuilder { // make class final compared to old TopologyBuilder

    // existing public API from current TopologyBuilder class that will not be changed

    public enum AutoOffsetReset {
        EARLIEST , LATEST
    }

    public TopologyBuilder();


    public synchronized TopologyBuilder addSource(String name, String... topics);
    public synchronized TopologyBuilder addSource(String name, Pattern topicPattern);

    public synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name,  String... topics);
    public synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name,  Pattern topicPattern);

    public synchronized TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics);
    public synchronized TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern);
   
    public synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics);
    public synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name,  Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern;


    public synchronized TopologyBuilder addSink(String name, String topic, String... parentNames);
    public synchronized TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames);
 
    public synchronized TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames);
    public synchronized <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames);


    public synchronized TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames);
 
    public synchronized TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames);
 
    public synchronized TopologyBuilder addGlobalStore(final StateStore store,
                                                       final String sourceName,
                                                       final Deserializer keyDeserializer,
                                                       final Deserializer valueDeserializer,
                                                       final String topic,
                                                       final String processorName,
                                                       final ProcessorSupplier stateUpdateSupplier);

    public synchronized TopologyBuilder connectProcessorAndStateStores(String processorName, String... stateStoreNames);



    // newly added method to support builder pattern
 
    // should be used instead of KafkaStreams() constructor
    // old:
    //     TopologyBuilder builder = new TopologyBuilder();
    //     KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(...));
    // new:
    //     TopologyBuilder builder = new TopologyBuilder();
    //     KafkaStreams streams = builder.build(new StreamsConfig(...));
 
    public synchronized KafkaStreams build(final StreamsConfig config);
    public synchronized KafkaStreams build(final StreamsConfig config, final KafkaClientSupplier clientSupplier);
}

 

New org.apache.kafka.streams.KStreamBuilder class:

Code Block
languagejava
public final class KStreamBuilder {

    // existing public API from current TopologyBuilder class that will not be changed
    // small change: adding `synchronized` to all methods to align with TopologyBuilder

    public KStreamBuilder();
 

    public synchronized <K, V> KStream<K, V> stream(final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerde, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerde, final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final Pattern topicPattern);
 

    public synchronized <K, V> KTable<K, V> table(final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName);


    public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final String storeName);
    public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName);

    public synchronized <K, V> KStream<K, V> merge(final KStream<K, V>... streams);



    // newly added method to support builder pattern
 
    // should be used instead of KafkaStreams() constructor
    // old:
    //     KStreamBuilder builder = new KStreamBuilder();
    //     KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(...));
    // new:
    //     KStreamBuilder builder = new KStreamBuilder();
    //     KafkaStreams streams = builder.build(new StreamsConfig(...));
 
    public synchronized KafkaStreams build(final StreamsConfig config);
	public synchronized KafkaStreams build(final StreamsConfig config, final KafkaClientSupplier clientSupplier);

    // return the internally used TopologyBuilder
    public synchronized TopologyBuilder topologyBuilder();
 
}

 

Proposed Changes

Refactor the code to remove internal methods from public API.

...