You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

New consumer API

  /**
   *  Create a list of MessageStreams for each topic.
   *
   *  @param topicCountMap  a map of (topic, #streams) pair
   *  @param decoder Decoder to decode each Message to type T
   *  @return a map of (topic, list of  KafkaMessageAndMetadataStream) pairs.
   *          The number of items in the list is #streams. Each stream supports
   *          an iterator over message/metadata pairs.
   */
  def createMessageStreams[T](topicCountMap: Map[String,Int],
                              decoder: Decoder[T] = new DefaultDecoder)
    : Map[String,List[KafkaMessageAndMetadataStream[T]]]


  /**
   *  Create a list of message streams for all topics that match a given filter.
   *
   *  @param filterSpec Either a Whitelist or Blacklist TopicFilterSpec object.
   *  @param numStreams Number of streams to return
   *  @param decoder Decoder to decode each Message to type T
   *  @return a list of KafkaMessageAndMetadataStream each of which provides an
   *          iterator over message/metadata pairs over allowed topics.
   */
  def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec,
                                      numStreams: Int = 1,
                                      decoder: Decoder[T] = new DefaultDecoder)
    : Seq[KafkaMessageAndMetadataStream[T]]

FAQ

What is a TopicFilterSpec?

TopicFilterSpec can be either a whitelist or a blacklist. e.g.,

  • Example of a whitelist TopicFilterSpec: new Whitelist("white.*")
  • Example of a blacklist TopicFilterSpec: new Blacklist("black.*")

Although Java regex allows you to specify anything with a single regex (i.e., you don't really need a blacklist option per se), negating a whitelist in Java regex is clumsy. It is convenient to be able to easily specify a whitelist and blacklist. Although right now TopicFilterSpec supports only one of whitelist/blacklist in future we may want to support a chain of filters to do more elaborate topic selection.

What is a MessageAndMetadata?
case class MessageAndMetadata[T](message: T, topic: String = "", offset: Long = -1L)

The KafkaMessageAndMetadataStream[T]'s iterator is a ConsumerIterator[T] which is an iterator over MessageAndMetadata[T] objects.

Impact to clients

Client code will need to change, since the current pattern of:

for (message <- stream) {
  // process(message)
}

will change to:

for (msgAndMetadata <- stream) {
  // processMessage(msgAndMetadata.message)
  // can also access msgAndMetadata.offset, topic, etc. if appropriate
}

Existing API

  /**
   *  Create a list of MessageStreams for each topic.
   *
   *  @param topicCountMap  a map of (topic, #streams) pair
   *  @return a map of (topic, list of  KafkaMessageStream) pair. The number of items in the
   *          list is #streams. Each KafkaMessageStream supports an iterator of messages.
   */
  def createMessageStreams[T](topicCountMap: Map[String,Int],
                              decoder: Decoder[T] = new DefaultDecoder)
    : Map[String,List[KafkaMessageStream[T]]]

Comments

  • KafkaMessageAndMetadataStream is a pretty unwieldy name - KafkaStream maybe

References

  1. https://issues.apache.org/jira/browse/KAFKA-249
  2. Mailing list discussion
  • No labels