You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 14 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state  Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Multiple-thread is not implemented in ConsumerPerformance. It was implemented for old consumer and removed completely in https://github.com/apache/kafka/pull/5230/, but the option [threads] was kept. It's necessary to make option [threads] work again.

Public Interfaces

kafka.tools.ConsumerPerformance is used in kafka-consumer-perf-test.sh.  This KIP is to make option [threads] work. 

Proposed Changes

We define a new class ConsumerPerfThread in ConsumerPerformance to implement multi-thread, which is not a public interface.

AttributeDescription
threadId : Int The thread Id.
config : ConsumerPerfConfig The configuration  of ComsumerPerformance
totalMessagesRead : LongAdder The total number of messages which have been read
totalBytesRead : LongAdder The total byte of messages which have been read
consumer : KafkaConsumer[Array[Byte], Array[Byte]] The consumer to fetch messages
topics: List[String]Topic to test
metric : mutable.Map[MetricName, _ <: Metric] The metric for consumer, which used to get rebalanceTime
testStartTime: Long The time when the test starts

Below are the flowcharts of ConsumerPerformance and ConsumerPerfThread. The general process is:

  1. Get ConsumerPerfConfig, like the topic to test,  message number to consume, threads number, broker list and etc.
  2. Get how many partitions the topic has. If threads number is bigger than partitions number, setting threads number to partitions number and giving a warning.
  3. New Consumers with the same config. The number of Consumers is the same as the threads number.
  4. New ConsumerPerfThread for every Consumer. They share a variable totalMessageRead. If the totalMesssageRead is bigger than the message number to consume,  all threads will end.
  5. New a thread pool, and submit ConsumerPerfThread to it.
  6. Wait for the threads to finish and shutdown thread pool.
  7. Compute and print stats.

Chart1: ConsumerPerformance

Chart2: ConsumerPerfThread


Compatibility, Deprecation, and Migration Plan

N/A

Rejected Alternatives

Just as the multi-thread implement for the old Consumer,  the option [numMessages] is the number of messages every thread to consume not the total message to consume for all threads. This way can avoid using global variable totalMesssageRead as a condition to end thread.  


  • No labels