...
Attribute | Description |
---|---|
threadId : Int | The thread Id. |
config : ConsumerPerfConfig | The configuration of ComsumerPerformance |
totalMessagesRead : LongAdder | The total number of messages which have been read |
totalBytesRead : LongAdder | The total byte of messages which have been read |
consumer : KafkaConsumer[Array[Byte], Array[Byte]] | The consumer to fetch messages |
metric : mutable.Map[MetricName, _ <: Metric] | The metric for consumer, which used to get rebalanceTime |
Below are the flowcharts of ConsumerPerformance and ConsumerPerfThread. The general process is:
- Get ConsumerPerfConfig, like the topic to test, message number to consume, threads number, broker list and etc.
- Get how many partitions the topic has. If threads number is bigger than partitions number, setting threads number to partitions number and giving a warning.
- New Consumers with the same config. The number of Consumers is the same as the threads number.
- New ConsumerPerfThread for every Consumer. They share a variable totalMessageRead. If the totalMesssageRead is bigger than the message number to consume, all threads will end.
- New a thread pool, and submit ConsumerPerfThread to it.
- Wait for the threads to finish and shutdown thread pool.
- Compute and print stats.
Chart1: ConsumerPerformance
Chart2: ConsumerPerfThread
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.Just as the multi-thread implement for the old Consumer, the option [numMessages] is the number of messages every thread to consume not the total message to consume for all threads. This way can avoid using global variable totalMesssageRead as a condition to end thread.