Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

AttributeDescription
threadId : Int The thread Id.
config : ConsumerPerfConfig The configuration  of ComsumerPerformance
totalMessagesRead : LongAdder The total number of messages which have been read
totalBytesRead : LongAdder The total byte of messages which have been read
consumer : KafkaConsumer[Array[Byte], Array[Byte]] The consumer to fetch messages
metric : mutable.Map[MetricName, _ <: Metric] The metric for consumer, which used to get rebalanceTime

Image Removed

Below are the flowcharts of ConsumerPerformance and ConsumerPerfThread. The general process is:

  1. Get ConsumerPerfConfig, like the topic to test,  message number to consume, threads number, broker list and etc.
  2. Get how many partitions the topic has. If threads number is bigger than partitions number, setting threads number to partitions number and giving a warning.
  3. New Consumers with the same config. The number of Consumers is the same as the threads number.
  4. New ConsumerPerfThread for every Consumer. They share a variable totalMessageRead. If the totalMesssageRead is bigger than the message number to consume,  all threads will end.
  5. New a thread pool, and submit ConsumerPerfThread to it.
  6. Wait for the threads to finish and shutdown thread pool.
  7. Compute and print stats.

Chart1: ConsumerPerformance

Image Added

Chart2: ConsumerPerfThread

Image AddedImage Removed


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.Just as the multi-thread implement for the old Consumer,  the option [numMessages] is the number of messages every thread to consume not the total message to consume for all threads. This way can avoid using global variable totalMesssageRead as a condition to end thread.