Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
  /**
   *  Create a list of MessageStreams for each topic.
   *
   *  @param topicCountMap  a map of (topic, #streams) pair
   *  @param decoder Decoder to decode each Message to type T
   *  @return a map of (topic, list of  KafkaMessageAndMetadataStream) pairs.
   *          The number of items in the list is #streams. Each stream supports
   *          an iterator over message/metadata pairs.
   */
  def createMessageStreams[T](topicCountMap: Map[String,Int],
                              decoder: Decoder[T] = new DefaultDecoder)
    : Map[String,List[KafkaMessageAndMetadataStream[T]]]


  /**
   *  Create a list of message streams for all topics that match a given filter.
   *
   *  @param filterSpec Either a Whitelist or Blacklist TopicFilterSpec object.
   *  @param numStreams Number of streams to return
   *  @param decoder Decoder to decode each Message to type T
   *  @return a list of KafkaMessageAndMetadataStream each of which provides an
   *          iterator over message/metadata pairs over allowed topics.
   */
  def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec,
                                      numStreams: Int = 1,
                                      decoder: Decoder[T] = new DefaultDecoder)
    : Seq[KafkaMessageAndMetadataStream[T]]

...

Questions/discussion

What is a TopicFilterSpec?

...

Wiki Markup
The {{KafkaMessageAndMetadataStream\[T\]}}'s iterator is a {{ConsumerIterator\[T\]}} which is an iterator over {{MessageAndMetadata\[T\]}} objects.

Wiki Markup
{{KafkaMessageAndMetadataStream\[T\]}} is a pretty unwieldy name

Wiki Markup
Agreed - {{KafkaStream\[T\]}} is fine, and I feel better than {{KafkaMessageStream\[T\]}} beacause the latter suggests its only messages (and no metadata). Also, the iterator is over {{MessageAndMetadata\[T\]}} which is still a bit unwieldy. Maybe we should just call it {{StreamData}}?

Impact to clients

Client code will need to change, since the current pattern of:

...

Code Block
  /**
   *  Create a list of MessageStreams for each topic.
   *
   *  @param topicCountMap  a map of (topic, #streams) pair
   *  @return a map of (topic, list of  KafkaMessageStream) pair. The number of items in the
   *          list is #streams. Each KafkaMessageStream supports an iterator of messages.
   */
  def createMessageStreams[T](topicCountMap: Map[String,Int],
                              decoder: Decoder[T] = new DefaultDecoder)
    : Map[String,List[KafkaMessageStream[T]]]

Comments

...

References

  1. https://issues.apache.org/jira/browse/KAFKA-249
  2. Mailing list discussion