New consumer API

  /**
   *  Create a list of MessageStreams for each topic.
   *
   *  @param topicCountMap  a map of (topic, #streams) pair
   *  @param decoder Decoder to decode each Message to type T
   *  @return a map of (topic, list of KafkaStream) pairs.
   *          The number of items in the list is #streams. Each stream supports
   *          an iterator over message/metadata pairs.
   */
  def createMessageStreams[T](topicCountMap: Map[String,Int],
                              decoder: Decoder[T] = new DefaultDecoder)
    : Map[String,List[KafkaStream[T]]]


  /**
   *  Create a list of message streams for all topics that match a given filter.
   *
   *  @param topicFilter Either a Whitelist or Blacklist TopicFilter object.
   *  @param numStreams Number of streams to return
   *  @param decoder Decoder to decode each Message to type T
   *  @return a list of KafkaStream each of which provides an
   *          iterator over message/metadata pairs over allowed topics.
   */
  def createMessageStreamsByFilter[T](topicFilter: TopicFilter,
                                      numStreams: Int = 1,
                                      decoder: Decoder[T] = new DefaultDecoder)
    : Seq[KafkaStream[T]]

Questions/discussion

What is a TopicFilter?

TopicFilter can be either a whitelist or a blacklist. e.g.,

Although Java regex allows you to specify anything with a single regex (i.e., you don't really need a blacklist option per se), negating a whitelist in Java regex is clumsy. It is convenient to be able to easily specify a whitelist and blacklist. Although right now TopicFilter supports only one of whitelist/blacklist in future we may want to support a chain of filters to do more elaborate topic selection.

What is a MessageAndMetadata?
case class MessageAndMetadata[T](message: T, topic: String = "", offset: Long = -1L)

The KafkaStream[T]'s iterator is a ConsumerIterator[T] which is an iterator over MessageAndMetadata[T] objects.

Can we eliminate the need for two methods in the API? Also, providing a topic-count-map in the createMessageStreams API is burdensome. Can we get rid of that?

If we don't support the one-shot approach of creating multiple streams for multiple topics, then the most obvious alternative is:

  def createMessageStreams[T](topic: String, numStreams: Int,
                              decoder: Decoder[T] = new DefaultDecoder)
    : Seq[KafkaStream[T]]

Advantages

Disadvantages

Impact to clients

Client code will need to change, since the current pattern of:

for (message <- stream) {
  // process(message)
}

will change to:

for (msgAndMetadata <- stream) {
  // processMessage(msgAndMetadata.message)
  // can also access msgAndMetadata.offset, topic, etc. if appropriate
}

Existing API

  /**
   *  Create a list of MessageStreams for each topic.
   *
   *  @param topicCountMap  a map of (topic, #streams) pair
   *  @return a map of (topic, list of  KafkaMessageStream) pair. The number of items in the
   *          list is #streams. Each KafkaMessageStream supports an iterator of messages.
   */
  def createMessageStreams[T](topicCountMap: Map[String,Int],
                              decoder: Decoder[T] = new DefaultDecoder)
    : Map[String,List[KafkaMessageStream[T]]]

References

  1. https://issues.apache.org/jira/browse/KAFKA-249
  2. Mailing list discussion