THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
/** * Create a list of MessageStreams for each topic. * * @param topicCountMap a map of (topic, #streams) pair * @param decoder Decoder to decode each Message to type T * @return a map of (topic, list of KafkaMessageAndMetadataStream) pairs. * The number of items in the list is #streams. Each stream supports * an iterator over message/metadata pairs. */ def createMessageStreams[T](topicCountMap: Map[String,Int], decoder: Decoder[T] = new DefaultDecoder) : Map[String,List[KafkaMessageAndMetadataStream[T]]] /** * Create a list of message streams for all topics that match a given filter. * * @param filterSpec Either a Whitelist or Blacklist TopicFilterSpec object. * @param numStreams Number of streams to return * @param decoder Decoder to decode each Message to type T * @return a list of KafkaMessageAndMetadataStream each of which provides an * iterator over message/metadata pairs over allowed topics. */ def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec, numStreams: Int = 1, decoder: Decoder[T] = new DefaultDecoder) : Seq[KafkaMessageAndMetadataStream[T]] |
...
Questions/discussion
What is a TopicFilterSpec
?
...
Wiki Markup |
---|
The {{KafkaMessageAndMetadataStream\[T\]}}'s iterator is a {{ConsumerIterator\[T\]}} which is an iterator over {{MessageAndMetadata\[T\]}} objects. |
Wiki Markup |
---|
{{KafkaMessageAndMetadataStream\[T\]}} is a pretty unwieldy name |
Wiki Markup |
---|
Agreed - {{KafkaStream\[T\]}} is fine, and I feel better than {{KafkaMessageStream\[T\]}} beacause the latter suggests its only messages (and no metadata). Also, the iterator is over {{MessageAndMetadata\[T\]}} which is still a bit unwieldy. Maybe we should just call it {{StreamData}}? |
Impact to clients
Client code will need to change, since the current pattern of:
...
Code Block |
---|
/** * Create a list of MessageStreams for each topic. * * @param topicCountMap a map of (topic, #streams) pair * @return a map of (topic, list of KafkaMessageStream) pair. The number of items in the * list is #streams. Each KafkaMessageStream supports an iterator of messages. */ def createMessageStreams[T](topicCountMap: Map[String,Int], decoder: Decoder[T] = new DefaultDecoder) : Map[String,List[KafkaMessageStream[T]]] |
Comments
...