You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

Discussion thread: TODO

JIRA: KAFKA-3856

Released: 0.10.3

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, Kafka Streams public API leaks a bunch of internal methods that should not be public. Furthermore, DSL and PAPI abstraction are not completely separated at the moment and TopologyBuilder offers methods that belong to DSL only. Additionally, we want to change the creation of KafkaStreams instances and remove the uncommon constructor pattern.

Public Interfaces

In order to get a clean refactoring, we will deprecate classes 

  • org.apache.kafka.streams.processor.TopologyBuilder
  • org.apache.kafka.streams.kstream.KStreamBuilder

and add them again (with same name but different package)

  • org.apache.kafka.streams.TopologyBuilder
  • org.apache.kafka.streams.KStreamBuilder

Furthermore, we deprecate all constructors of class KafkaStreams.

The following methods will not be available in the newly added classes:

  • TopologyBuilder: setApplicationId, connectSourceStoreAndTopicconnectProcessorsaddInternalTopiccopartitionSources, nodeGroups, build, buildGlobalStateTopology, globalStateStores, topicGroups, earliestResetTopicPattern, latestResetTopicPattern, stateStoreNamesToSourceTopics, copartitioneGroups, sourceTopicPattern, updateSubscriptions
  • KStreamBuilder: all of above (as KStreamBuilder does not inherit from TopologyBuilder anymore) plus newName
  • methods highlighted in TopologyBuilder list, are methods that actually belong to DSL abstraction

 

New org.apache.kafka.streams.TopologyBuilder class:

public final class TopologyBuilder { // make class final compared to old TopologyBuilder

    // existing public API from current TopologyBuilder class that will not be changed

    public enum AutoOffsetReset {
        EARLIEST , LATEST
    }

    public TopologyBuilder();


    public synchronized TopologyBuilder addSource(String name, String... topics);
    public synchronized TopologyBuilder addSource(String name, Pattern topicPattern);

    public synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name,  String... topics);
    public synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name,  Pattern topicPattern);

    public synchronized TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics);
    public synchronized TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern);
   
    public synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics);
    public synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name,  Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern;


    public synchronized TopologyBuilder addSink(String name, String topic, String... parentNames);
    public synchronized TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames);
 
    public synchronized TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames);
    public synchronized <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames);


    public synchronized TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames);
 
    public synchronized TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames);
 
    public synchronized TopologyBuilder addGlobalStore(final StateStore store,
                                                       final String sourceName,
                                                       final Deserializer keyDeserializer,
                                                       final Deserializer valueDeserializer,
                                                       final String topic,
                                                       final String processorName,
                                                       final ProcessorSupplier stateUpdateSupplier);

    public synchronized TopologyBuilder connectProcessorAndStateStores(String processorName, String... stateStoreNames);



    // newly added method to support builder pattern
 
    // should be used instead of KafkaStreams() constructor
    // old:
    //     TopologyBuilder builder = new TopologyBuilder();
    //     KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(...));
    // new:
    //     TopologyBuilder builder = new TopologyBuilder();
    //     KafkaStreams streams = builder.build(new StreamsConfig(...));
 
    public synchronized KafkaStreams build();

}

 

New org.apache.kafka.streams.KStreamBuilder class:

public final class KStreamBuilder {

    // existing public API from current TopologyBuilder class that will not be changed
    // small change: adding `synchronized` to all methods to align with TopologyBuilder

    public KStreamBuilder();
 

    public synchronized <K, V> KStream<K, V> stream(final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerde, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerde, final Pattern topicPattern);

    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final String... topics);
    public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final Pattern topicPattern);
 

    public synchronized <K, V> KTable<K, V> table(final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName);
    public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName);


    public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final String storeName);
    public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName);

    public synchronized <K, V> KStream<K, V> merge(final KStream<K, V>... streams);



    // newly added method to support builder pattern
 
    // should be used instead of KafkaStreams() constructor
    // old:
    //     KStreamBuilder builder = new KStreamBuilder();
    //     KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(...));
    // new:
    //     KStreamBuilder builder = new KStreamBuilder();
    //     KafkaStreams streams = builder.build(new StreamsConfig(...));
 
    public synchronized KafkaStreams build();
}

 

Proposed Changes

Refactor the code to remove internal methods from public API.

We will add two new internal classes

  • org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
  • org.apache.kafka.streams.kstream.internals.InternalKStreamBuilder 

that offer the methods remove from current API. Thus, both classes are the actual implementation. Old  TopologyBuilder and KStreamBuilder are only proxy classes to both classes respectively, for backward compatibility.

The newly added TopologyBuilder uses InternalTopologyBuilder as member.

The newly added KStreamBuilder used the new TopologyBuilder as a member (no class hierarchy anymore – using it as member gives a clear separation between PAPI and DSL).

 

Note: because of backward compatibility, removing DSL specific classes offered by old TopologyBuilder must be offered by InternalTopologyBuilder for now. However, after both deprecated classes got removed, this cleanup can be done (and does not require a KIP anymore, because it's internal refactoring -- we just need to create a JIRA for this). Thus, this KIP falls short to separate PAPI and DSL completely. But it's a necessary first step to do the separation in a backward compatible way (backward compatibility requires a two step approach).

Compatibility, Deprecation, and Migration Plan

  • Because no classes/method will be removed but only deprecated, this change will be fully backward compatible
  • We intend to remove all deprecated classes/methods in 0.10.4, but we can keep them longer on user request

Test Plan

Test need to be rewritten but no new tests are required. All tests for public API need to be updated to use new TopologyBuilder and KStreamBuilder, and thus will not call deprecated KafkaStreams constructor anymore. All tests using internal API, need to be rewritten to use InternalTopologyBuilder and InternalKStreamBuilder.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels