You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

The source code for Apache Kafka System Tools are located https://github.com/apache/kafka/tree/0.8/core/src/main/scala/kafka/tools

If you are looking for the replication related tools then please check out the wiki page on Replication tools

System tools can be run from the command line using the run class script (i.e. bin/kafka-run-class.sh package.class --options)

Consumer Offset Checker

Displays the:  Consumer Group, Topic, Partitions, Offset, logSize, Lag, Owner for the specified set of Topics and Consumer Group

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker

required argument: [group]
Option Description
------ -----------
--broker-info Print broker info
--group Consumer group.
--help Print this message.
--topic Comma-separated list of consumer
topics (all topics if absent).
--zkconnect ZooKeeper connect string. (default: localhost:2181)

Dump Log Segment

This can print the messages directly from the log files or just verify the indexes correct for the logs

bin/kafka-run-class.sh kafka.tools.DumpLogSegments

required argument "[files]"

Option Description
------ -----------
--deep-iteration if set, uses deep instead of shallow iteration
--files <file1, file2, ...> REQUIRED: The comma separated list of data and index log files to be dumped
--max-message-size <Integer: size> Size of largest message. (default: 5242880)
--print-data-log if set, printing the messages content when dumping data logs
--verify-index-only if set, just verify the index log without printing its content

Export Zookeeper Offsets

A utility that retrieve the offset of broker partitions in ZK and prints to an output file in the following format:

/consumers/group1/offsets/topic1/1-0:286894308
/consumers/group1/offsets/topic1/2-0:284803985

bin/kafka-run-class.sh kafka.tools.ExportZkOffsets
required argument: [zkconnect]
Option Description
------ -----------
--group Consumer group.
--help Print this message.
--output-file Output file
--zkconnect ZooKeeper connect string. (default: localhost:2181)

Get Offset Shell

get offsets for a topic

bin/kafka-run-class.sh kafka.tools.GetOffsetShell
required argument [broker-list], [topic]
Option Description
------ -----------
--broker-list <hostname:port,..., REQUIRED: The list of hostname and hostname:port> port of the server to connect to.
--max-wait-ms <Integer: ms> The max amount of time each fetch request waits. (default: 1000)
--offsets <Integer: count> number of offsets returned (default: 1)
--partitions <partition ids> comma separated list of partition ids. If not specified, it will find offsets for all partitions (default)
--time <Long: timestamp/-1(latest)/-2 timestamp of the offsets before that (earliest)>
--topic <topic> REQUIRED: The topic to get offset from.

Import Zookeeper Offsets

can import offsets for a topic partitions

file formation is the same as the export  

/consumers/group1/offsets/topic1/1-0:286894308
/consumers/group1/offsets/topic1/2-0:284803985

bin/kafka-run-class.sh kafka.tools.ImportZkOffsets
required argument: [input-file]
Option Description
------ -----------
--help Print this message.
--input-file Input file
--zkconnect ZooKeeper connect string. (default: localhost:2181)

JMX Tool

prints metrics via JMX

bin/kafka-run-class.sh kafka.tools.JmxTool

Option Description
------ -----------
--attributes <name> The whitelist of attributes to query. This is a comma-separated list. If no attributes are specified all objects will be queried.
--date-format <format> The date format to use for formatting the time field. See java.text. SimpleDateFormat for options.
--help Print usage information.
--jmx-url <service-url> The url to connect to to poll JMX data. See Oracle javadoc for JMXServiceURL for details. (default: service:jmx:rmi:///jndi/rmi://: 9999/jmxrmi)
--object-name <name> A JMX object name to use as a query. This can contain wild cards, and  this option can be given multiple times to specify more than one query. If no objects are specified all objects will be queried.
--reporting-interval <Integer: ms> Interval in MS with which to poll jmx stats. (default: 2000)

Kafka Migration Tool

Migrates a 0.7 broker to 0.8

bin/kafka-run-class.sh kafka.tools.KafkaMigrationTool
Missing required argument "[consumer.config]"
Option Description
------ -----------
--blacklist <Java regex (String)> Blacklist of topics to migrate from the 0.7 cluster
--consumer.config <config file> Kafka 0.7 consumer config to consume from the source 0.7 cluster. You man specify multiple of these.
--help Print this message.
--kafka.07.jar <kafka 0.7 jar> Kafka 0.7 jar file
--num.producers <Integer: Number of Number of producer instances (default: producers> 1)
--num.streams <Integer: Number of Number of consumer streams (default: 1)consumer threads>
--producer.config <config file> Producer config.
--queue.size <Integer: Queue size in Number of messages that are buffered terms of number of messages> between the 0.7 consumer and 0.8 producer (default: 10000)
--whitelist <Java regex (String)> Whitelist of topics to migrate from the 0.7 cluster
--zkclient.01.jar <zkClient 0.1 jar zkClient 0.1 jar file file required by Kafka 0.7>

Mirror Maker

Provides mirroring of one Kafka cluster to another, for more info check out the wiki page on Kafka mirroring (MirrorMaker)

bin/kafka-run-class.sh kafka.tools.MirrorMaker
 required argument [consumer.config]
Option Description
------ -----------
--blacklist <Java regex (String)> Blacklist of topics to mirror.
--consumer.config <config file> Consumer config to consume from a source cluster. You may specify multiple of these.
--help Print this message.
--num.producers <Integer: Number of Number of producer instances (default: producers> 1)
--num.streams <Integer: Number of Number of consumption streams. threads> (default: 1)
--producer.config <config file> Embedded producer config.
--queue.size <Integer: Queue size in Number of messages that are buffered terms of number of messages> between the consumer and producer (default: 10000)
--whitelist <Java regex (String)> Whitelist of topics to mirror.

Replay Log Producer

Consume from one topic and replay those messages and produce to another topic

bin/kafka-run-class.sh kafka.tools.ReplayLogProducer
required argument [broker-list], [input-topic], [output-topic], [zookeeper]
Option Description
------ -----------
--async If set, messages are sent asynchronously.
--batch-size <Integer: batch size> Number of messages to send in a single batch. (default: 200)
--broker-list <hostname:port> REQUIRED: the broker list must be specified.
--compression-codec <Integer: If set, messages are sent compressed compression codec > (default: 0)
--delay-btw-batch-ms <Long: ms> Delay in ms between 2 batch sends. (default: 0)
--inputtopic <input-topic> REQUIRED: The topic to consume from.
--messages <Integer: count> The number of messages to send. (default: -1)
--outputtopic <output-topic> REQUIRED: The topic to produce to
--reporting-interval <Integer: size> Interval at which to print progress info. (default: 5000)
--threads <Integer: threads> Number of sending threads. (default: 1)
--zookeeper <zookeeper url> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over. (default: 127.0.0.1:2181)

Simple Consumer Shell

Dumps out consumed messages to the console using the Simple Consumer

bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell
required argument [broker-list], [topic]
Option Description
------ -----------
--broker-list <hostname:port,..., REQUIRED: The list of hostname and hostname:port> port of the server to connect to.
--clientId <clientId> The ID of this client. (default: SimpleConsumerShell)
--fetchsize <Integer: fetchsize> The fetch size of each request. (default: 1048576)
--formatter <class> The name of a class to use for formatting kafka messages for display. (default: kafka.consumer. DefaultMessageFormatter)
--max-messages <Integer: max-messages> The number of messages to consume (default: 2147483647)
--max-wait-ms <Integer: ms> The max amount of time each fetch request waits. (default: 1000)
--no-wait-at-logend If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages
--offset <Long: consume offset> The offset id to consume from, default to -2 which means from beginning; while value -1 means from end (default: -2)
--partition <Integer: partition> The partition to consume from. (default: 0)
--print-offsets Print the offsets returned by the iterator
--property <prop>
--replica <Integer: replica id> The replica id to consume from, default -1 means leader broker. (default: -1)
--skip-message-on-error If there is an error when processing a message, skip it instead of halt.
--topic <topic> REQUIRED: The topic to consume from.

State Change Log Merger

A utility that merges the state change logs (possibly obtained from different brokers and over multiple days).

bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger
Provide arguments to exactly one of the two options "[logs]" or "[logs-regex]"
Option Description
------ -----------
--end-time <end timestamp in the The latest timestamp of state change format java.text. log entries to be merged (default: SimpleDateFormat@f17a63e7> 9999-12-31 23:59:59,999)
--logs <file1,file2,...> Comma separated list of state change logs or a regex for the log file names
--logs-regex <for example: /tmp/state- Regex to match the state change log change.log*> files to be merged
--partitions <0,1,2,...> Comma separated list of partition ids whose state change logs should be merged
--start-time <start timestamp in the The earliest timestamp of state change format java.text. log entries to be merged (default: SimpleDateFormat@f17a63e7> 0000-00-00 00:00:00,000)
--topic <topic> The topic whose state change logs should be merged

Update Offsets In Zookeeper

A utility that updates the offset of every broker partition to the offset of earliest or latest log segment file, in ZK.

bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK
USAGE: kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic

Verify Consumer Rebalance

Make sure there is an owner for every partition. A successful rebalancing operation would select an owner for each available partition. 
This means that for each partition registered under /brokers/topics/[topic]/[broker-id], an owner exists under /consumers/[consumer_group]/owners/[topic]/[broker_id-partition_id]

bin/kafka-run-class.sh kafka.tools.VerifyConsumerRebalance
required argument: [group]
Option Description
------ -----------
--group Consumer group.
--help Print this message.
--zookeeper.connect ZooKeeper connect string. (default: localhost:2181)

 

 

 

 

 

 

  • No labels