Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • org.apache.kafka.streams.Topology
  • org.apache.kafka.streams.StreamsTopologyBuilderStreamsBuilder

Furthermore, we add a new class to get a full description the a topology (to compensate for some internal methods that get removed)

...

  • TopologyBuilder -> Topology: setApplicationId, connectSourceStoreAndTopicconnectProcessorsaddInternalTopiccopartitionSources, nodeGroups, build, buildGlobalStateTopology, globalStateStores, topicGroups, earliestResetTopicPattern, latestResetTopicPattern, stateStoreNamesToSourceTopics, copartitioneGroups, sourceTopicPattern, updateSubscriptions
  • KStreamBuilder -> StreamsTopologyBuilderStreamsBuilder: all methods from TopologyBuilder (as KStreamBuilder does not inherit from TopologyBuilder anymore) except addStateStore and addGlobalStore (which are both added explicitly to StreamsTopologyBuilder StreamsBuilder) plus newName
  • methods highlighted in TopologyBuilder list, are methods that actually belong to DSL abstraction

...

New org.apache.kafka.streams.StreamTopologyBuilderStreamsBuilder class:

Code Block
languagejava
public class StreamsTopologyBuilderStreamsBuilder {

    // existing public API from current KStreamBuilder class that will not be changed
    // small change: adding `synchronized` to all methods to align with TopologyBuilder

    public StreamsTopologyBuilderStreamsBuilder();


    public synchronized <K, V> KStream<K, V> stream(final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerde, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerde, final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final Pattern topicPattern);


    public synchronized <K, V> KTable<K, V> table(final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName);


    public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final String storeName);
    public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName);

    public synchronized <K, V> KStream<K, V> merge(final KStream<K, V>... streams);



    // newly added method

    public synchronized KStreamBuilder addStateStore(StateStoreSupplier supplier, String... processorNames);
    public synchronized KStreamBuilder addGlobalStore(final StateStore store,
                                                      final String sourceName,
                                                      final Deserializer keyDeserializer,
                                                      final Deserializer valueDeserializer,
                                                      final String topic,
                                                      final String processorName,
                                                      final ProcessorSupplier stateUpdateSupplier);

    public synchronized Topology build();
}

...

  • org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
  • org.apache.kafka.streams.kstream.internals.InternalStreamsTopologyBuilder InternalStreamsBuilder 

that offer the methods remove from current API. Thus, both classes are the actual implementation. Old  TopologyBuilder and KStreamBuilder are only proxy classes to both classes respectively, for backward compatibility.

The newly added Topology uses InternalTopologyBuilder as member.

The newly added StreamsTopologyBuilder StreamsBuilder uses the new Topology as a member (no class hierarchy anymore – using it as member gives a clear separation between PAPI and DSL).

Because the new StreamsTopologyBuilder StreamsBuilder does not inherit from new Topology we need to add StreamsTopologyBuilder#buildStreamsBuilder#build() that returns the actual Topology to be passed into KafkaStreams client.

...

Tests need to be rewritten but no new tests are required. All tests for public API need to be updated to use new Topology and StreamsTopologyBuilderStreamsBuilder. All tests using internal API, need to be rewritten to use InternalTopologyBuilder and InternalStreamsTopologyBuilderInternalStreamsBuilder.

Rejected Alternatives

None.

...