Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public final class KStreamBuilder { // make class final compared to old KStreamBuilder

    // existing public API from current TopologyBuilderKStreamBuilder class that will not be changed
    // small change: adding `synchronized` to all methods to align with TopologyBuilder

    public KStreamBuilder();
 

    public synchronized <K, V> KStream<K, V> stream(final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerde, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerde, final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final Pattern topicPattern);
 

    public synchronized <K, V> KTable<K, V> table(final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName);


    public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final String storeName);
    public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName);

    public synchronized <K, V> KStream<K, V> merge(final KStream<K, V>... streams);



    // newly added method to support builder pattern
 
    // should be used instead of KafkaStreams() constructor
    // old:
    //     KStreamBuilder builder = new KStreamBuilder();
    //     KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(...));
    // new:
    //     KStreamBuilder builder = new KStreamBuilder();
    //     KafkaStreams streams = builder.build(new StreamsConfig(...));
 
    public synchronized KafkaStreams build(final StreamsConfig config);
	public synchronized KafkaStreams build(final StreamsConfig config, final KafkaClientSupplier clientSupplier);

    // return the internally used TopologyBuilder
    public synchronized TopologyBuilder topologyBuilder();
 
}

...