Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
  /**
   *  Create a list of MessageStreams for each topic.
   *
   *  @param topicCountMap  a map of (topic, #streams) pair
   *  @param decoder Decoder to decode each Message to type T
   *  @return a map of (topic, list of  KafkaMessageAndMetadataStreamKafkaStream) pairs.
   *          The number of items in the list is #streams. Each stream supports
   *          an iterator over message/metadata pairs.
   */
  def createMessageStreams[T](topicCountMap: Map[String,Int],
                              decoder: Decoder[T] = new DefaultDecoder)
    : Map[String,List[KafkaMessageAndMetadataStreamKafkaStream[T]]]


  /**
   *  Create a list of message streams for all topics that match a given filter.
   *
   *  @param filterSpectopicFilter Either a Whitelist or Blacklist TopicFilterSpecTopicFilter object.
   *  @param numStreams Number of streams to return
   *  @param decoder Decoder to decode each Message to type T
   *  @return a list of KafkaMessageAndMetadataStreamKafkaStream each of which provides an
   *          iterator over message/metadata pairs over allowed topics.
   */
  def createMessageStreamsByFilter[T](filterSpectopicFilter: TopicFilterSpecTopicFilter,
                                      numStreams: Int = 1,
                                      decoder: Decoder[T] = new DefaultDecoder)
    : Seq[KafkaMessageAndMetadataStreamKafkaStream[T]]

Questions/discussion

What is a

...

TopicFilter?

TopicFilterSpec TopicFilter can be either a whitelist or a blacklist. e.g.,

  • Example of a whitelist TopicFilterSpec TopicFilter: new Whitelist("white.*")
  • Example of a blacklist TopicFilterSpec TopicFilter: new Blacklist("black.*")

Although Java regex allows you to specify anything with a single regex (i.e., you don't really need a blacklist option per se), negating a whitelist in Java regex is clumsy. It is convenient to be able to easily specify a whitelist and blacklist. Although right now TopicFilterSpec TopicFilter supports only one of whitelist/blacklist in future we may want to support a chain of filters to do more elaborate topic selection.

...

Code Block
case class MessageAndMetadata[T](message: T, topic: String = "", offset: Long = -1L)

Wiki Markup
The {{KafkaMessageAndMetadataStreamKafkaStream\[T\]}}'s iterator is a {{ConsumerIterator\[T\]}} which is an iterator over {{MessageAndMetadata\[T\]}} objects.

Wiki Markup
{{KafkaMessageAndMetadataStream\[T\]}} is a pretty unwieldy name

Wiki Markup
Agreed - {{KafkaStream\[T\]}} is fine, and I feel better than {{KafkaMessageStream\[T\]}} beacause the latter suggests its only messages (and no metadata). Also, the iterator is over {{MessageAndMetadata\[T\]}} which is still a bit unwieldy. Maybe we should just call it {{StreamData}}?

Impact to clients

Client code will need to change, since the current pattern of:

...