This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Multiple-thread is not implemented in ConsumerPerformance. It was implemented for old consumer and removed completely in https://github.com/apache/kafka/pull/5230/, but the option [threads] was kept. It's necessary to make option [threads] work again.
Public Interfaces
kafka.tools.ConsumerPerformance is used in kafka-consumer-perf-test.sh. This KIP is to make option [threads] work.
Proposed Changes
We define a new class ConsumerPerfThread in ConsumerPerformance to implement multi-thread, which is not a public interface.
Attribute | Description |
---|---|
threadId : Int | The thread Id. |
config : ConsumerPerfConfig | The configuration of ComsumerPerformance |
totalMessagesRead : LongAdder | The total number of messages which have been read |
totalBytesRead : LongAdder | The total byte of messages which have been read |
consumer : KafkaConsumer[Array[Byte], Array[Byte]] | The consumer to fetch messages |
topics: List[String] | Topic to test |
metric : mutable.Map[MetricName, _ <: Metric] | The metric for consumer, which used to get rebalanceTime |
testStartTime: Long | The time when the test starts |
Below are the flowcharts of ConsumerPerformance and ConsumerPerfThread. The general process is:
- Get ConsumerPerfConfig, like the topic to test, message number to consume, threads number, broker list and etc.
- Get how many partitions the topic has. If threads number is bigger than partitions number, setting threads number to partitions number and giving a warning.
- New Consumers with the same config. The number of Consumers is the same as the threads number.
- New ConsumerPerfThread for every Consumer. They share a variable totalMessageRead. If the totalMesssageRead is bigger than the message number to consume, all threads will end.
- New a thread pool, and submit ConsumerPerfThread to it.
- Wait for the threads to finish and shutdown thread pool.
- Compute and print stats.
Chart1: ConsumerPerformance
Chart2: ConsumerPerfThread
Compatibility, Deprecation, and Migration Plan
N/A
Rejected Alternatives
Just as the multi-thread implement for the old Consumer, the option [numMessages] is the number of messages every thread to consume not the total message to consume for all threads. This way can avoid using global variable totalMesssageRead as a condition to end thread.