New consumer API
/** * Create a list of MessageStreams for each topic. * * @param topicCountMap a map of (topic, #streams) pair * @param decoder Decoder to decode each Message to type T * @return a map of (topic, list of KafkaMessageAndMetadataStream) pairs. * The number of items in the list is #streams. Each stream supports * an iterator over message/metadata pairs. */ def createMessageStreams[T](topicCountMap: Map[String,Int], decoder: Decoder[T] = new DefaultDecoder) : Map[String,List[KafkaMessageAndMetadataStream[T]]] /** * Create a list of message streams for all topics that match a given filter. * * @param filterSpec Either a Whitelist or Blacklist TopicFilterSpec object. * @param numStreams Number of streams to return * @param decoder Decoder to decode each Message to type T * @return a list of KafkaMessageAndMetadataStream each of which provides an * iterator over message/metadata pairs over allowed topics. */ def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec, numStreams: Int = 1, decoder: Decoder[T] = new DefaultDecoder) : Seq[KafkaMessageAndMetadataStream[T]]
Questions/discussion
What is a TopicFilterSpec
?
TopicFilterSpec
can be either a whitelist or a blacklist. e.g.,
- Example of a whitelist
TopicFilterSpec
:new Whitelist("white.*")
- Example of a blacklist
TopicFilterSpec
:new Blacklist("black.*")
Although Java regex allows you to specify anything with a single regex (i.e., you don't really need a blacklist option per se), negating a whitelist in Java regex is clumsy. It is convenient to be able to easily specify a whitelist and blacklist. Although right now TopicFilterSpec
supports only one of whitelist/blacklist in future we may want to support a chain of filters to do more elaborate topic selection.
What is a MessageAndMetadata
?
case class MessageAndMetadata[T](message: T, topic: String = "", offset: Long = -1L)
The KafkaMessageAndMetadataStream[T]
's iterator is a ConsumerIterator[T]
which is an iterator over MessageAndMetadata[T]
objects.
KafkaMessageAndMetadataStream[T]
is a pretty unwieldy name
Agreed - KafkaStream[T]
is fine, and I feel better than KafkaMessageStream[T]
beacause the latter suggests its only messages (and no metadata). Also, the iterator is over MessageAndMetadata[T]
which is still a bit unwieldy. Maybe we should just call it StreamData
?
Impact to clients
Client code will need to change, since the current pattern of:
for (message <- stream) { // process(message) }
will change to:
for (msgAndMetadata <- stream) { // processMessage(msgAndMetadata.message) // can also access msgAndMetadata.offset, topic, etc. if appropriate }
Existing API
/** * Create a list of MessageStreams for each topic. * * @param topicCountMap a map of (topic, #streams) pair * @return a map of (topic, list of KafkaMessageStream) pair. The number of items in the * list is #streams. Each KafkaMessageStream supports an iterator of messages. */ def createMessageStreams[T](topicCountMap: Map[String,Int], decoder: Decoder[T] = new DefaultDecoder) : Map[String,List[KafkaMessageStream[T]]]