Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
// new class: o.a.k.streams.streams.processor.RecordContext


public interface RecordContext {
    /**
     * @return  The offset of the original record received from Kafka;
     *          could be -1 if it is not available
     */
    long offset();

    /**
     * @return  The timestamp extracted from the record received from Kafka;
     *          could be -1 if it is not available
     */
    long timestamp();

    /**
     * @return  The topic the record was received on;
     *          could be null if it is not available
     */
    String topic();

    /**
     * @return  The partition the record was received on;
     *          could be -1 if it is not available
     */
    int partition();

    /**
     * @return  The headers from the record received from Kafka;
     *          could be null if it is not available
     */
    Headers headers();

}


// new class: o.a.k.streams.processor.TopicNameExtractor
 
// since this class will be used in both DSL and PAPI, I propose to put in lower-level processor package; this is similar to ProcessorSupplier.
 
@InterfaceStability.Evolving
public interface TopicNameExtractor<K, V> {

    /**
     * Extracts the topic name to send to. The topic name must be pre-existed, since the Kafka Streams library will not
     * try to automatically create the topic with the extracted name, and will fail with a timeout exception if the topic
     * does not exist in the Kafka cluster.
     *
     * @param key           the record key
     * @param value         the record value payload
     * @param recordContext current context metadata of the record
     * @return the topic name to send to
     */
    String extract(K key, V value, RecordContext recordContext);
}
 
// Topology.java
 
Topology addSink(final String name, final TopicNameExtractor<K, V> topicChooser, final String... parentNames)

Topology addSink(final String name, final TopicNameExtractor<K, V> topicChooser, final StreamPartitioner<? super K, ? super V> partitioner, final String... parentNames)

Topology addSink(final String name, final TopicNameExtractor<K, V> topicChooser, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final String... parentNames)

Topology addSink(final String name, final TopicNameExtractor<K, V> topicChooser, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner, final String... parentNames)


// KStream.java
 
void to(final TopicNameExtractor<K, V> topicChooser);
 
void to(final TopicNameExtractor<K, V> topicChooser, final Produced<K, V> produced);


// KStream.scala

def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K, V]): Unit

...