You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state "Draft“

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Multiple-thread is not implemented in ConsumerPerformance. It was implemented for old consumer and removed completely in https://github.com/apache/kafka/pull/5230/, but the option [threads] was kept. It's necessary to make option [threads] work again.

Public Interfaces


Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade


kafka.tools.ConsumerPerformance is used in kafka-consumer-perf-test.sh.  This KIP is to make option [threads] work. 

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.


We define a new class ConsumerPerfThread in ConsumerPerformance to implement multi-thread, which is not a public interface.

AttributeDescription
threadId : Int The thread Id.
config : ConsumerPerfConfig The configuration  of ComsumerPerformance
totalMessagesRead : LongAdder The total number of messages which have been read
totalBytesRead : LongAdder The total byte of messages which have been read
consumer : KafkaConsumer[Array[Byte], Array[Byte]] The consumer to fetch messages
metric : mutable.Map[MetricName, _ <: Metric] The metric for consumer, which used to get rebalanceTime
testStartTime: Long The time when the test starts

Below are the flowcharts of ConsumerPerformance and ConsumerPerfThread. The general process is:

  1. Get ConsumerPerfConfig, like the topic to test,  message number to consume, threads number, broker list and etc.
  2. Get how many partitions the topic has. If threads number is bigger than partitions number, setting threads number to partitions number and giving a warning.
  3. New Consumers with the same config. The number of Consumers is the same as the threads number.
  4. New ConsumerPerfThread for every Consumer. They share a variable totalMessageRead. If the totalMesssageRead is bigger than the message number to consume,  all threads will end.
  5. New a thread pool, and submit ConsumerPerfThread to it.
  6. Wait for the threads to finish and shutdown thread pool.
  7. Compute and print stats.

Chart1: ConsumerPerformance

Chart2: ConsumerPerfThread


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

Just as the multi-thread implement for the old Consumer,  the option [numMessages] is the number of messages every thread to consume not the total message to consume for all threads. This way can avoid using global variable totalMesssageRead as a condition to end thread.  


  • No labels