...
Code Block | ||
---|---|---|
| ||
// new class: o.a.k.streams.processor.TopicNameExtractor // since this class will be used in both DSL and PAPI, I propose to put in lower-level processor package; this is similar to ProcessorSupplier. @InterfaceStability.Evolving public interface TopicNameExtractor<K, V> { /** * Extracts the topic name to send to. The topic name must be pre-existed, since the Kafka Streams library will not * try to automatically create the topic with the extracted name, and will fail with a timeout exception if the topic * does not exist in the Kafka cluster. * * @param key the record key * @param value the record value payload * @param recordContext current context metadata of the record * @return the topic name to send to */ String extract(K key, V value, RecordContext recordContext); } // Topology.java Topology addSink(final String name, final TopicNameExtractor<K, V> topicChooser, final String... parentNames) Topology addSink(final String name, final TopicNameExtractor<K, V> topicChooser, final StreamPartitioner<? super K, ? super V> partitioner, final String... parentNames) Topology addSink(final String name, final TopicNameExtractor<K, V> topicChooser, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final String... parentNames) Topology addSink(final String name, final TopicNameExtractor<K, V> topicChooser, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner, final String... parentNames) // KStream.java void to(final TopicNameExtractor<K, V> topicChooser); void to(final TopicNameExtractor<K, V> topicChooser, final Produced<K, V> produced); // KStream.scala def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K, V]): Unit |
Also I propose to modify StreamPartitioner#partition to include the topic name, since with dynamic routing the topic name would not always be pre-known anymore:
Code Block | ||
---|---|---|
| ||
// StreamPartitioner.java
public interface StreamPartitioner<K, V> {
/**
* Determine the partition number for a record with the given key and value and the current number of partitions.
*
* @param topic the topic name this record is sent to
* @param key the key of the record
* @param value the value of the record
* @param numPartitions the total number of partitions
* @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
*/
Integer partition(String topic, K key, V value, int numPartitions);
}
|
Proposed Changes
With the newly added APIs, on the topology's sink node, for each received record we will apply the mapper extractor to get the topic name to send; if the topic that is sending to does not exist, Streams will fall into the normal fail-fast scenario, in which the metadata refresh will exhaust retries and a fatal RequestTimeout will be thrown.
...