THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
I propose adding the dynamic routing functionality at both the Topology and the DSL layer. More specifically, the following new APIs will be added:
Code Block | ||
---|---|---|
| ||
// new class: o.a.k.streams.processor.TopicNameExtractor // since this class will be used in both DSL and PAPI, I propose to put in lower-level processor package; this is similar to ProcessorSupplier. @InterfaceStability.Evolving public interface TopicNameExtractor<K, V> { /** * Extracts the topic name to send to. The topic name must be pre-existed, since the Kafka Streams library will not * try to automatically create the topic with the extracted name, and will fail with a timeout exception if the topic * does not exist in the Kafka cluster. * * @param key the record key * @param value the record value payload * @param recordContext current context metadata of the record * @return the topic name to send to */ String extract(K key, V value, RecordContext recordContext); } // Topology.java Topology addSink(final String name, final KeyValueMapper<KTopicNameExtractor<K, V,V> String> topicChooser, final String... parentNames) Topology addSink(final String name, final KeyValueMapper<KTopicNameExtractor<K, V,V> String> topicChooser, final StreamPartitioner<? super K, ? super V> partitioner, final String... parentNames) Topology addSink(final String name, final KeyValueMapper<KTopicNameExtractor<K, V, String>V> topicChooser, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final String... parentNames) Topology addSink(final String name, final KeyValueMapper<KTopicNameExtractor<K, V, String>V> topicChooser, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner, final String... parentNames) // KStream.java void to(final KeyValueMapper<? super KTopicNameExtractor<K, ? super V, String> V> topicChooser); void to(final KeyValueMapper<? super KTopicNameExtractor<K, ? super V, String> V> topicChooser, final Produced<K, V> produced); // KStream.scala def to(mapperextractor: (TopicNameExtractor[K, V) => String])(implicit produced: Produced[K, V]): Unit |
...