Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
// new class: o.a.k.streams.processor.TopicNameExtractor
 
// since this class will be used in both DSL and PAPI, I propose to put in lower-level processor package; this is similar to ProcessorSupplier.
 
@InterfaceStability.Evolving
public interface TopicNameExtractor<K, V> {

    /**
     * Extracts the topic name to send to. The topic name must be pre-existed, since the Kafka Streams library will not
     * try to automatically create the topic with the extracted name, and will fail with a timeout exception if the topic
     * does not exist in the Kafka cluster.
     *
     * @param key           the record key
     * @param value         the record value payload
     * @param recordContext current context metadata of the record
     * @return the topic name to send to
     */
    String extract(K key, V value, RecordContext recordContext);
}
 
// Topology.java
 
Topology addSink(final String name, final TopicNameExtractor<K, V> topicChooser, final String... parentNames)

Topology addSink(final String name, final TopicNameExtractor<K, V> topicChooser, final StreamPartitioner<? super K, ? super V> partitioner, final String... parentNames)

Topology addSink(final String name, final TopicNameExtractor<K, V> topicChooser, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final String... parentNames)

Topology addSink(final String name, final TopicNameExtractor<K, V> topicChooser, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner, final String... parentNames)


// KStream.java
 
void to(final TopicNameExtractor<K, V> topicChooser);
 
void to(final TopicNameExtractor<K, V> topicChooser, final Produced<K, V> produced);


// KStream.scala

def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K, V]): Unit

 

Also I propose to modify StreamPartitioner#partition to include the topic name, since with dynamic routing the topic name would not always be pre-known anymore:

Code Block
languagejava
// StreamPartitioner.java
 
public interface StreamPartitioner<K, V> {

    /**
     * Determine the partition number for a record with the given key and value and the current number of partitions.
     *
     * @param topic the topic name this record is sent to
     * @param key the key of the record
     * @param value the value of the record
     * @param numPartitions the total number of partitions
     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
     */
    Integer partition(String topic, K key, V value, int numPartitions);
}


Proposed Changes

With the newly added APIs, on the topology's sink node, for each received record we will apply the mapper extractor to get the topic name to send; if the topic that is sending to does not exist, Streams will fall into the normal fail-fast scenario, in which the metadata refresh will exhaust retries and a fatal RequestTimeout will be thrown.

...