Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Accepted [VOTE] KIP-120: Cleanup Kafka Streams builder API

Discussion thread[DISCUSS] KIP-120: Cleanup Kafka Streams builder API

JIRA: KAFKA-3856

Released: 1.0.10.30

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  • org.apache.kafka.streams.processor.TopologyBuilder
  • org.apache.kafka.streams.kstream.KStreamBuilder

and add them again ( with new name and different package)

  • org.apache.kafka.streams.Topology
  • org.apache.kafka.streams.KStreamBuilderStreamsBuilder

Furthermore, we add a new interface to get a full description the a topology (to compensate for some internal methods that get removed)

  • org.apache.kafka.streams.TopologyDescription

    • TopologyDescription will have public subclassesinterfaces:

      • Subtopology
      • GlobalStores
      • Node
       (interface)
      • Source
      • Sink
      • Processor

The following methods will not be available in the newly added classes:

  • TopologyBuilder -> Topology: setApplicationId, connectSourceStoreAndTopicconnectProcessorsaddInternalTopiccopartitionSources, nodeGroups, build, buildGlobalStateTopology, globalStateStores, topicGroups, earliestResetTopicPattern, latestResetTopicPattern, stateStoreNamesToSourceTopics, copartitioneGroups, sourceTopicPattern, updateSubscriptions
  • KStreamBuilder -> StreamsBuilder: all methods from TopologyBuilder except addStateStore and addGlobalStore (as KStreamBuilder does not inherit from TopologyBuilder anymore) plus except addStateStore and addGlobalStore (which are both added explicitly to StreamsBuilder) plus newName
  • methods highlighted in TopologyBuilder list, are methods that actually belong to DSL abstraction

...

Code Block
languagejava
public class Topology {

    // existing public API from current TopologyBuilder class that will not be changed

    public enum AutoOffsetReset {
        EARLIEST , LATEST
    }

    public Topology();


    public synchronized Topology addSource(String name, String... topics);
    public synchronized Topology addSource(String name, Pattern topicPattern);

    public synchronized Topology addSource(AutoOffsetReset offsetReset, String name,  String... topics);
    public synchronized Topology addSource(AutoOffsetReset offsetReset, String name,  Pattern topicPattern);

    public synchronized Topology addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, String... topics);
    public synchronized Topology addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, Pattern topicPattern);
   
    public synchronized Topology addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, String... topics);
    public synchronized Topology addSource(AutoOffsetReset offsetReset, String name,  Deserializer keyDeserializer, Deserializer valDeserializervalueDeserializer, Pattern topicPattern);


    public synchronized Topology addSink(String name, String topic, String... parentNames);
    public synchronized Topology addSink(String name, String topic, Serializer keySerializer, Serializer valSerializervalueSerializer, String... parentNames);
 
    public synchronized Topology addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames);
    public synchronized <K, V> Topology addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializervalueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames);


    public synchronized Topology addProcessor(String name, ProcessorSupplier supplier, String... parentNames);
 
    public synchronized Topology addStateStore(StateStoreSupplier supplier, String... processorNames);
 
    public synchronized Topology addGlobalStore(final StateStore store,
                                                final String sourceName,
                                                final Deserializer keyDeserializer,
                                                final Deserializer valueDeserializer,
                                                final String topic,
                                                final String processorName,
                                                final ProcessorSupplier stateUpdateSupplier);

    public synchronized Topology connectProcessorAndStateStores(String processorName, String... stateStoreNames);



    // newly added method to reveal topology structure
    
    // describes the current Topology, ie, TopologyDescription will not be updated if Topology is modified but #describe() must be called again
    public synchronized TopologyDescription describe();
}

...

New org.apache.kafka.streams.KStreamBuilderStreamsBuilder class:

Code Block
languagejava
public class KStreamBuilderStreamsBuilder {

    // existing public API from current KStreamBuilder class that will not be changed
    // small change: adding `synchronized` to all methods to align with TopologyBuilder

    public KStreamBuilderStreamsBuilder();


    public synchronized <K, V> KStream<K, V> stream(final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final Pattern topicPattern);


    public synchronized <K, V> KTable<K, V> table(final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final String topic, final StringString storeName);
    public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String topic, final String storeName);


    public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final String storeName);
    public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerdevalueSerde, final String topic, final String storeName);

    public synchronized <K, V> KStream<K, V> merge(final KStream<K, V>... streams);



    // newly added method

    public synchronized KStreamBuilder addStateStore(StateStoreSupplier supplier, String... processorNames);
    public synchronized KStreamBuilder addGlobalStore(final StateStore store,
                                                      final String sourceName,
                                                      final Deserializer keyDeserializer,
                                                      final Deserializer valueDeserializer,
                                                      final String topic,
                                                      final String processorName,
                                                      final ProcessorSupplier stateUpdateSupplier);

    public synchronized Topology build();
}

...

Code Block
languagejava
public finalinterface class TopologyDescriptionTopologyDescription {
    public finalSet<Subtopology> List<Subtopology> subtopologies();
    publicSet<GlobalStore> final List<GlobalStore> globalStores();

    public final classinferface Subtopology {
        int id();
     public  final List<Node>Set<Node> nodes();
    }

    public finalinterface class GlobalStore {
        public final String name;
Source source();
          public final String topicProcessor processor();
    }

    public interface Node {
        String name()
        List<Node>Set<Node> getPredecessorspredecessors();
        List<Node>Set<Node> getSuccessorssuccessors();
    }

    publicinterface final class Source implementsextends Node {
        public final String nametopics();
 // can be comma separated list of public final String topic; // can be topic name names or pattern (as String)
    }

    public final class Processor implements Node {
        public final String name;
        public final List<String> stores;
    }

    public final class Sink implements Node {
        public final String name;
        public final String topic;
    }

}

 

Changes to org.apache.kafka.streams.KafkaStreams:

Code Block
languagejava
public class KafkaStreams {

    // deprecating old constructors

    @Deprecated
    public KafkaStreams(final TopologyBuilder builder,
                        final Properties props);

    @Deprecated
    public KafkaStreams(final TopologyBuilder builder,
                        final StreamsConfig config);

    @Deprecated
    public KafkaStreams(final TopologyBuilder builder,
                        final StreamsConfig config,
                        final KafkaClientSupplier clientSupplier);

    // adding new constructors

    public KafkaStreams(final Topology builder,
                        final Properties props);

    public KafkaStreams(final Topology builder,
                        final StreamsConfig config);

    public KafkaStreams(final Topology builder,
                
    }

    interface Processor extends Node {
        final StreamsConfig config,Set<String> stores();
    }

    interface Sink extends Node {
        String topic();
   final KafkaClientSupplier clientSupplier);}

}

 

Proposed Changes

Refactor the code to remove internal methods from public API.

We will add two new internal classes

  • org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
  • org.apache.kafka.streams.kstream.internals.InternalKStreamBuilder InternalStreamsBuilder 

that offer the methods remove from current API. Thus, both classes are the actual implementation. Old  TopologyBuilder and KStreamBuilder are only proxy classes to both classes respectively, for backward compatibility.

The newly added Topology uses InternalTopologyBuilder as member.

The newly added KStreamBuilder used the new TopologyBuilder StreamsBuilder uses the new Topology as a member (no class hierarchy anymore – using it as member gives a clear separation between PAPI and DSL).

Because the new KStreamBuilder StreamsBuilder does not inherit from new TopologyBuilder we need to add some more constructors to KafkaStreamsTopology we need to add StreamsBuilder#build() that returns the actual Topology to be passed into KafkaStreams client.

Note: because of backward compatibility, removed DSL specific classes offered by old TopologyBuilder must be offered by InternalTopologyBuilder for now. However, after both deprecated classes got removed, this cleanup can be done (and does not require a KIP anymore, because it's internal refactoring -- we just need to create a JIRA for this). Thus, this KIP falls short of separating PAPI and DSL completely. But it's a necessary first step to do the separation in a backward compatible way (backward compatibility requires a two step approach).

...

  • Because no classes/method will be removed but only deprecated, this change will be fully backward compatible
  • We intend to remove all deprecated classes/methods in 0.1011.41, but we can keep them longer on user request

Test Plan

Test Tests need to be rewritten but no new tests are required. All tests for public API need to be updated to use new TopologyBuilder and KStreamBuilder, and thus will not call deprecated KafkaStreams constructor anymoreTopology and StreamsBuilder. All tests using internal API, need to be rewritten to use InternalTopologyBuilder and InternalKStreamBuilderInternalStreamsBuilder.

Rejected Alternatives

None.

...