Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

I propose adding the dynamic routing functionality at both the Topology and the DSL layer. More specifically, the following new APIs will be added:

Code Block
languagejava
// new class: o.a.k.streams.processor.TopicNameExtractor
 
// since this class will be used in both DSL and PAPI, I propose to put in lower-level processor package; this is similar to ProcessorSupplier.
 
@InterfaceStability.Evolving
public interface TopicNameExtractor<K, V> {

    /**
     * Extracts the topic name to send to. The topic name must be pre-existed, since the Kafka Streams library will not
     * try to automatically create the topic with the extracted name, and will fail with a timeout exception if the topic
     * does not exist in the Kafka cluster.
     *
     * @param key           the record key
     * @param value         the record value payload
     * @param recordContext current context metadata of the record
     * @return the topic name to send to
     */
    String extract(K key, V value, RecordContext recordContext);
}
 
// Topology.java
 
Topology addSink(final String name, final KeyValueMapper<KTopicNameExtractor<K, V,V> String> topicChooser, final String... parentNames)

Topology addSink(final String name, final KeyValueMapper<KTopicNameExtractor<K, V,V> String> topicChooser, final StreamPartitioner<? super K, ? super V> partitioner, final String... parentNames)

Topology addSink(final String name, final KeyValueMapper<KTopicNameExtractor<K, V, String>V> topicChooser, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final String... parentNames)

Topology addSink(final String name, final KeyValueMapper<KTopicNameExtractor<K, V, String>V> topicChooser, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner, final String... parentNames)


// KStream.java
 
void to(final KeyValueMapper<? super KTopicNameExtractor<K, ? super V, String> V> topicChooser);
 
void to(final KeyValueMapper<? super KTopicNameExtractor<K, ? super V, String> V> topicChooser, final Produced<K, V> produced);


// KStream.scala

def to(mapperextractor: (TopicNameExtractor[K, V) => String])(implicit produced: Produced[K, V]): Unit

...