The source code for Apache Kafka System Tools are located https://github.com/apache/kafka/tree/0.8/core/src/main/scala/kafka/tools
If you are looking for the replication related tools then please check out the wiki page on Replication tools
System tools can be run from the command line using the run class script (i.e. bin/kafka-run-class.sh package.class --options)
Consumer Offset Checker
This tool has been removed in Kafka 1.0.0. Use kafka-consumer-groups.sh to get consumer group details.
Displays the: Consumer Group, Topic, Partitions, Offset, logSize, Lag, Owner for the specified set of Topics and Consumer Group
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
required argument: [group]
Option Description
------ -----------
--broker-info Print broker info
--group Consumer group.
--help Print this message.
--topic Comma-separated list of consumer
topics (all topics if absent).
--zkconnect ZooKeeper connect string. (default: localhost:2181)
Dump Log Segment
This can print the messages directly from the log files or just verify the indexes correct for the logs
bin/kafka-run-class.sh kafka.tools.DumpLogSegments
required argument "[files]"
Option Description
------ -----------
--deep-iteration if set, uses deep instead of shallow iteration
--files <file1, file2, ...> REQUIRED: The comma separated list of data and index log files to be dumped
--max-message-size <Integer: size> Size of largest message. (default: 5242880)
--print-data-log if set, printing the messages content when dumping data logs
--verify-index-only if set, just verify the index log without printing its content
Export Zookeeper Offsets
A utility that retrieves the offsets of broker partitions in ZK and prints to an output file in the following format:
/consumers/group1/offsets/topic1/1-0:286894308
/consumers/group1/offsets/topic1/2-0:284803985
bin/kafka-run-class.sh kafka.tools.ExportZkOffsets
required argument: [zkconnect]
Option Description
------ -----------
--group Consumer group.
--help Print this message.
--output-file Output file
--zkconnect ZooKeeper connect string. (default: localhost:2181)
Get Offset Shell
get offsets for a topic
bin/kafka-run-class.sh kafka.tools.GetOffsetShell
required argument [broker-list], [topic]
Option Description
------ -----------
--broker-list <hostname:port,..., REQUIRED: The list of hostname and hostname:port> port of the server to connect to.
--max-wait-ms <Integer: ms> The max amount of time each fetch request waits. (default: 1000)
--offsets <Integer: count> number of offsets returned (default: 1)
--partitions <partition ids> comma separated list of partition ids. If not specified, will find offsets for all partitions (default)
--time <Long: timestamp in milliseconds / -1(latest) / -2 (earliest) timestamp; offsets will come before this timestamp, as in getOffsetsBefore >
--topic <topic> REQUIRED: The topic to get offsets from.
Import Zookeeper Offsets
can import offsets for a topic partitions
file format is the same as for the export
/consumers/group1/offsets/topic1/1-0:286894308
/consumers/group1/offsets/topic1/2-0:284803985
bin/kafka-run-class.sh kafka.tools.ImportZkOffsets
required argument: [input-file]
Option Description
------ -----------
--help Print this message.
--input-file Input file
--zkconnect ZooKeeper connect string. (default: localhost:2181)
JMX Tool
prints metrics via JMX
bin/kafka-run-class.sh kafka.tools.JmxTool
Option Description
------ -----------
--attributes <name> The whitelist of attributes to query. This is a comma-separated list. If no attributes are specified all objects will be queried.
--date-format <format> The date format to use for formatting the time field. See java.text. SimpleDateFormat for options.
--help Print usage information.
--jmx-url <service-url> The url to connect to to poll JMX data. See Oracle javadoc for JMXServiceURL for details. (default: service:jmx:rmi:///jndi/rmi://: 9999/jmxrmi)
--object-name <name> A JMX object name to use as a query. This can contain wild cards, and this option can be given multiple times to specify more than one query. If no objects are specified all objects will be queried.
--reporting-interval <Integer: ms> Interval in MS with which to poll jmx stats. (default: 2000)
Kafka Migration Tool
Migrates a 0.7 broker to 0.8
bin/kafka-run-class.sh kafka.tools.KafkaMigrationTool
Missing required argument "[consumer.config]"
Option Description
------ -----------
--blacklist <Java regex (String)> Blacklist of topics to migrate from the 0.7 cluster
--consumer.config <config file> Kafka 0.7 consumer config to consume from the source 0.7 cluster. You man specify multiple of these.
--help Print this message.
--kafka.07.jar <kafka 0.7 jar> Kafka 0.7 jar file
--num.producers <Integer: Number of Number of producer instances (default: producers> 1)
--num.streams <Integer: Number of Number of consumer streams (default: 1)consumer threads>
--producer.config <config file> Producer config.
--queue.size <Integer: Queue size in Number of messages that are buffered terms of number of messages> between the 0.7 consumer and 0.8 producer (default: 10000)
--whitelist <Java regex (String)> Whitelist of topics to migrate from the 0.7 cluster
--zkclient.01.jar <zkClient 0.1 jar zkClient 0.1 jar file file required by Kafka 0.7>
Mirror Maker
Provides mirroring of one Kafka cluster to another, for more info check out the wiki page on Kafka mirroring (MirrorMaker)
bin/kafka-run-class.sh kafka.tools.MirrorMaker
required argument [consumer.config]
Option Description
------ -----------
--blacklist <Java regex (String)> Blacklist of topics to mirror.
--consumer.config <config file> Consumer config to consume from a source cluster. You may specify multiple of these.
--help Print this message.
--num.producers <Integer: Number of Number of producer instances (default: producers> 1)
--num.streams <Integer: Number of Number of consumption streams. threads> (default: 1)
--producer.config <config file> Embedded producer config.
--queue.size <Integer: Queue size in Number of messages that are buffered terms of number of messages> between the consumer and producer (default: 10000)
--whitelist <Java regex (String)> Whitelist of topics to mirror.
Replay Log Producer
Consume from one topic and replay those messages and produce to another topic
bin/kafka-run-class.sh kafka.tools.ReplayLogProducer
required argument [broker-list], [input-topic], [output-topic], [zookeeper]
Option Description
------ -----------
--async If set, messages are sent asynchronously.
--batch-size <Integer: batch size> Number of messages to send in a single batch. (default: 200)
--broker-list <hostname:port> REQUIRED: the broker list must be specified.
--compression-codec <Integer: If set, messages are sent compressed compression codec > (default: 0)
--delay-btw-batch-ms <Long: ms> Delay in ms between 2 batch sends. (default: 0)
--inputtopic <input-topic> REQUIRED: The topic to consume from.
--messages <Integer: count> The number of messages to send. (default: -1)
--outputtopic <output-topic> REQUIRED: The topic to produce to
--reporting-interval <Integer: size> Interval at which to print progress info. (default: 5000)
--threads <Integer: threads> Number of sending threads. (default: 1)
--zookeeper <zookeeper url> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over. (default: 127.0.0.1:2181)
Simple Consumer Shell
Dumps out consumed messages to the console using the Simple Consumer
bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell
required argument [broker-list], [topic]
Option Description
------ -----------
--broker-list <hostname:port,..., REQUIRED: The list of hostname and hostname:port> port of the server to connect to.
--clientId <clientId> The ID of this client. (default: SimpleConsumerShell)
--fetchsize <Integer: fetchsize> The fetch size of each request. (default: 1048576)
--formatter <class> The name of a class to use for formatting kafka messages for display. (default: kafka.consumer. DefaultMessageFormatter)
--max-messages <Integer: max-messages> The number of messages to consume (default: 2147483647)
--max-wait-ms <Integer: ms> The max amount of time each fetch request waits. (default: 1000)
--no-wait-at-logend If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages
--offset <Long: consume offset> The offset id to consume from, default to -2 which means from beginning; while value -1 means from end (default: -2)
--partition <Integer: partition> The partition to consume from. (default: 0)
--print-offsets Print the offsets returned by the iterator
--property <prop>
--replica <Integer: replica id> The replica id to consume from, default -1 means leader broker. (default: -1)
--skip-message-on-error If there is an error when processing a message, skip it instead of halt.
--topic <topic> REQUIRED: The topic to consume from.
State Change Log Merger
A utility that merges the state change logs (possibly obtained from different brokers and over multiple days).
bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger
Provide arguments to exactly one of the two options "[logs]" or "[logs-regex]"
Option Description
------ -----------
--end-time <end timestamp in the The latest timestamp of state change format java.text. log entries to be merged (default: SimpleDateFormat@f17a63e7> 9999-12-31 23:59:59,999)
--logs <file1,file2,...> Comma separated list of state change logs or a regex for the log file names
--logs-regex <for example: /tmp/state- Regex to match the state change log change.log*> files to be merged
--partitions <0,1,2,...> Comma separated list of partition ids whose state change logs should be merged
--start-time <start timestamp in the The earliest timestamp of state change format java.text. log entries to be merged (default: SimpleDateFormat@f17a63e7> 0000-00-00 00:00:00,000)
--topic <topic> The topic whose state change logs should be merged
Update Offsets In Zookeeper
A utility that updates the offset of every broker partition to the offset of earliest or latest log segment file, in ZK.
bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK
USAGE: kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic
Verify Consumer Rebalance
Make sure there is an owner for every partition. A successful rebalancing operation would select an owner for each available partition.
This means that for each partition registered under /brokers/topics/[topic]/[broker-id], an owner exists under /consumers/[consumer_group]/owners/[topic]/[broker_id-partition_id]
bin/kafka-run-class.sh kafka.tools.VerifyConsumerRebalance
required argument: [group]
Option Description
------ -----------
--group Consumer group.
--help Print this message.
--zookeeper.connect ZooKeeper connect string. (default: localhost:2181)