Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • org.apache.kafka.streams.processor.TopologyBuilder
  • org.apache.kafka.streams.kstream.KStreamBuilder

and add them again (with same new name but and different package)

  • org.apache.kafka.streams.TopologyBuilderTopology
  • org.apache.kafka.streams.KStreamBuilder
  • org.apache.kafka.streams.TopologyDescription
    • TopologyDescription will have public subclasses

    • Subtopology
    • GlobalStores
    • Node (interface)
    • Source
    • Sink
    • Processor

...

  • TopologyBuilder: setApplicationId, connectSourceStoreAndTopicconnectProcessorsaddInternalTopiccopartitionSources, nodeGroups, build, buildGlobalStateTopology, globalStateStores, topicGroups, earliestResetTopicPattern, latestResetTopicPattern, stateStoreNamesToSourceTopics, copartitioneGroups, sourceTopicPattern, updateSubscriptions
  • KStreamBuilder: all of above from TopologyBuilder except addStateStore and addGlobalStore (as KStreamBuilder does not inherit from TopologyBuilder anymore) plus newName
  • methods highlighted in TopologyBuilder list, are methods that actually belong to DSL abstraction

...

New org.apache.kafka.streams.TopologyBuilderTopology class:

Code Block
languagejava
public final class TopologyBuilderTopology { // make class final compared to old TopologyBuilder

    // existing public API from current TopologyBuilder class that will not be changed

    public enum AutoOffsetReset {
        EARLIEST , LATEST
    }

    public TopologyBuilderTopology();


    public synchronized TopologyBuilderTopology addSource(String name, String... topics);
    public synchronized TopologyBuilderTopology addSource(String name, Pattern topicPattern);

    public synchronized TopologyBuilderTopology addSource(AutoOffsetReset offsetReset, String name,  String... topics);
    public synchronized TopologyBuilderTopology addSource(AutoOffsetReset offsetReset, String name,  Pattern topicPattern);

    public synchronized TopologyBuilderTopology addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics);
    public synchronized TopologyBuilderTopology addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern);
   
    public synchronized TopologyBuilderTopology addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics);
    public synchronized TopologyBuilderTopology addSource(AutoOffsetReset offsetReset, String name,  Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern;


    public synchronized TopologyBuilderTopology addSink(String name, String topic, String... parentNames);
    public synchronized TopologyBuilderTopology addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames);
 
    public synchronized TopologyBuilderTopology addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames);
    public synchronized <K, V> TopologyBuilderTopology addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames);


    public synchronized TopologyBuilderTopology addProcessor(String name, ProcessorSupplier supplier, String... parentNames);
 
    public synchronized TopologyBuilderTopology addStateStore(StateStoreSupplier supplier, String... processorNames);
 
    public synchronized TopologyBuilderTopology addGlobalStore(final StateStore store,
                                                       final String sourceName,
                                                       final Deserializer keyDeserializer,
                                                       final Deserializer valueDeserializer,
                                                       final String topic,
                                                       final String processorName,
                                                       final ProcessorSupplier stateUpdateSupplier);

    public synchronized TopologyBuilderTopology connectProcessorAndStateStores(String processorName, String... stateStoreNames);



    // newly added method to reveal topology structure
    
    public synchronized TopologyDescription describe();
}

...

Code Block
languagejava
public final class KStreamBuilder {

    // make class final compared to old KStreamBuilder

    // existing public API from current KStreamBuilder class that will not be changed
    // small change: adding `synchronized` to all methods to align with TopologyBuilder

    public KStreamBuilder();
 

    public synchronized <K, V> KStream<K, V> stream(final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerde, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerde, final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final Pattern topicPattern);
 

    public synchronized <K, V> KTable<K, V> table(final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName);


    public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final String storeName);
    public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName);

    public synchronized <K, V> KStream<K, V> merge(final KStream<K, V>... streams);



    // newly added method

 to   revealpublic topologysynchronized structure
KStreamBuilder addStateStore(StateStoreSupplier supplier, String... processorNames);
    public synchronized TopologyDescriptionKStreamBuilder describe();
 
}

 

New org.apache.kafka.streams.TopologyDescription class:

Code Block
languagejava
public final class TopologyDescription {
addGlobalStore(final StateStore store,
         public final List<Subtopology> subtopologies;
    public final List<GlobalStore> globalStores;
 
    public final class Subtopology {
        public final List<Node> nodes;
    }
 
    public final class GlobalStore {
    final String sourceName,
  public final String name;
        public final String topic;
    }
 
    public interface Node {
        List<Node> getPredecessors();
        List<Node> getSuccessors();
    }
 
    public final classDeserializer SourcekeyDeserializer,
 implements Node {
                           public final String name;
        public final String topic; // can be topic name or pattern (as String)
  final Deserializer }valueDeserializer,
 
     public final class Processor implements Node {
                                           final String topic,
                                                      final String processorName,
                                                      final ProcessorSupplier stateUpdateSupplier);

    public synchronized Topology build();
}

 

New org.apache.kafka.streams.TopologyDescription class:

Code Block
languagejava
public final class TopologyDescription {
    public final List<Subtopology> subtopologies;
    public final List<GlobalStore> globalStores;

    public final class StringSubtopology name;{
        public final List<String>List<Node> storesnodes;
    }
 
    public final class Sink implements NodeGlobalStore {
        public final String name;
        public final String topic;
    }
    
}

 

Changes to org.apache.kafka.streams.KafkaStreams:

Code Block
languagejava
public class KafkaStreams {
 public final String topic;
    }

    //public deprecatinginterface oldNode constructors
{
    @Deprecated
    publicList<Node> KafkaStreams(final TopologyBuilder builder, // old TopologyBuildergetPredecessors();
        List<Node> getSuccessors();
    }

    public final class Source  implements Node {
        public final PropertiesString props)name;

    @Deprecated
    public KafkaStreams(final TopologyBuilderString builder,topic; // can oldbe TopologyBuilder
topic name or pattern (as String)
    }

    public final class Processor implements Node {
     final StreamsConfig config);

 public final String @Deprecatedname;
     public KafkaStreams(final TopologyBuilder builder,public //final oldList<String> TopologyBuilderstores;
    }

    public final class Sink implements Node {
        public  final StreamsConfigString config,name;
        public final String topic;
    }

}

 

Changes to org.apache.kafka.streams.KafkaStreams:

Code Block
languagejava
public class KafkaStreams {

    // deprecating    final KafkaClientSupplier clientSupplier);old constructors

    // adding new constructors
 @Deprecated
    public KafkaStreams(final TopologyBuilder builder, // new TopologyBuilder
                        final Properties props);

    @Deprecated
    public KafkaStreams(final TopologyBuilder builder, // new TopologyBuilder
                        final StreamsConfig config);

    @Deprecated
    public KafkaStreams(final TopologyBuilder builder, // new TopologyBuilder
                        final StreamsConfig config,
                        final KafkaClientSupplier clientSupplier);

    // adding new constructors

    public KafkaStreams(final KStreamBuilderTopology builder, // new KStreamBuilder
                        final Properties props);

    public KafkaStreams(final KStreamBuilderTopology builder, // new KStreamBuilder
                        final StreamsConfig config);

    public KafkaStreams(final KStreamBuilderTopology builder, // new KStreamBuilder
                        final StreamsConfig config,
                        final KafkaClientSupplier clientSupplier);


}

Proposed Changes

Refactor the code to remove internal methods from public API.

...

that offer the methods remove from current API. Thus, both classes are the actual implementation. Old  TopologyBuilder and KStreamBuilder are only proxy classes to both classes respectively, for backward compatibility.

The newly added TopologyBuilder uses Topology uses InternalTopologyBuilder as member.

...