THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
// new class: o.a.k.streams.streams.processor.RecordContext
public interface RecordContext {
/**
* @return The offset of the original record received from Kafka;
* could be -1 if it is not available
*/
long offset();
/**
* @return The timestamp extracted from the record received from Kafka;
* could be -1 if it is not available
*/
long timestamp();
/**
* @return The topic the record was received on;
* could be null if it is not available
*/
String topic();
/**
* @return The partition the record was received on;
* could be -1 if it is not available
*/
int partition();
/**
* @return The headers from the record received from Kafka;
* could be null if it is not available
*/
Headers headers();
}
// new class: o.a.k.streams.processor.TopicNameExtractor
// since this class will be used in both DSL and PAPI, I propose to put in lower-level processor package; this is similar to ProcessorSupplier.
@InterfaceStability.Evolving
public interface TopicNameExtractor<K, V> {
/**
* Extracts the topic name to send to. The topic name must be pre-existed, since the Kafka Streams library will not
* try to automatically create the topic with the extracted name, and will fail with a timeout exception if the topic
* does not exist in the Kafka cluster.
*
* @param key the record key
* @param value the record value payload
* @param recordContext current context metadata of the record
* @return the topic name to send to
*/
String extract(K key, V value, RecordContext recordContext);
}
// Topology.java
Topology addSink(final String name, final TopicNameExtractor<K, V> topicChooser, final String... parentNames)
Topology addSink(final String name, final TopicNameExtractor<K, V> topicChooser, final StreamPartitioner<? super K, ? super V> partitioner, final String... parentNames)
Topology addSink(final String name, final TopicNameExtractor<K, V> topicChooser, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final String... parentNames)
Topology addSink(final String name, final TopicNameExtractor<K, V> topicChooser, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner, final String... parentNames)
// KStream.java
void to(final TopicNameExtractor<K, V> topicChooser);
void to(final TopicNameExtractor<K, V> topicChooser, final Produced<K, V> produced);
// KStream.scala
def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K, V]): Unit
|
...