Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Update the topics related tools to wirk with kafka_topics.sh

...

ListTopicCommand is an excellent tool that provides an overview of all the topic partitions in the cluster. For each topic partition, it displays the leader, assigned replicas and current "in-sync" replica set. If the leader and the first replica in the assigned replica set are the same then the Preferred replica leader election" tool succeeded. If not, the tool failed and may have to be run again.

2. Topics tool

What does the tool do?

Kafka topics tool is handling all management operations related to topics:

...

2.1 List and describe Topics

What does the tool do?

This tool lists the information for a given list of topics. If no topics are provided in the command line, the tool queries zookeeper to get all the topics and lists the information for them. The fields that the tool displays are - topic name, partition, leader, replicas, isr. Two optional arguments can be provided to the tool. If "under-replicated-partitions" is specified, the tool only provides information for those topic / partitions which have replicas that are under replicated. If "unavailable-partitions" is specified, the tool only provides information for those topic/partitions whose leader is not available.

How to use the tool?

Code Block
# List only single topic named "topic1" (prints only topic name)
bin/kafka-topics.sh --list --zookeeper localhost:2121 --topic topic1

 
# List all topics  (prints only topic names)
bin/kafka-topics.sh --list --zookeeper localhost:2121

# Describe only single topic named "topic1" (prints details about the topic)
bin/kafka-topics.sh --describe --zookeeper localhost:2121 --topic topic1

# Describe all topics  (prints details about the topics)
bin/kafka-topics.sh --describe --zookeeper localhost:2121
 
# List info for topics which have under replicated count
bin/kafka-topics.sh --describe --zookeeper localhost:2121 --under-replicated-partitions

# List info for topics whose leader for a partition is not available
bin/kafka-topics.sh --listdescribe --zookeeper localhost:2121 --unavailable-partitions

...

2.2 Create

...

Topics

What does the tool do?

By default, Kafka auto creates topic if "auto.create.topics.enable" is set to true on the server. This creates a topic with a default number of partitions, replication factor and uses Kafka's default scheme to do replica assignment. Sometimes, it may be required that we would like to customize a topic while creating it. This tool helps to create a topic and also specify the number of partitions, replication factor and replica assignment list for the topic.

How to use the tool?

Code Block
# Create topic with default settings
bin/kafka-create-topictopics.sh

Option --create --zookeeper localhost:2181 --topic topic1
 
# Create topic with specific number of partitions and/or replicas
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic topic1 --replication-factor 3 --partitions 3


# Create topic with manual       Description
------                                  -----------
--partition <Integer: # of partitions>    number of partitions in the topic
                                          (default: 1)

--replica <Integer: replication factor>   replication factor for each partitions
                                          in the topic (default: 1)

--replica-assignment-list                 for manually assigning replicas to brokers
                                          (default: )
                                          <broker_id_for_part1_replica1 :
                                           broker_id_for_part1_replica2,
                                           broker_id_for_part2_replica1 :
                                           broker_id_for_part2_replica2, ...>

--topic <topic>                         REQUIRED: The topic to be created.

--zookeeper <urls>                      REQUIRED: The connection string for
                                          the zookeeper connection in the form
                                          host:port. Multiple URLS can be
                                          given to allow fail-over.

5. Add Partition Tool

What does the tool do?

In Kafka partitions act as the unit of parallelism: messages of a single topic are distributed to multiple partitions that can be stored and served on different servers. Upon creation of a topic, the number of partitions for this topic has to be specified. Later on more partitions may be needed for this topic when the volume of this topic increases. This tool helps to add more partitions for a specific topic and also allow manual replica assignment of the added partitions. This tool is only available in the 0.8 branch for now.

How to use the tool?

Code Block
bin/kafka-add-partitions.sh

Option                                  Description
------                                  -----------
--partition <Integer: # of partitions>  REQUIRED: Number of partitions to add
                                          to the topic
--replica-assignment-list               For manually assigning replicas to
  <broker_id_for_part1_replica1 :         brokers for the new partitions
  broker_id_for_part1_replica2,           (default: )
  broker_id_for_part2_replica1 :
  broker_id_for_part2_replica2, ...>
--topic <topic>                         REQUIRED: The topic for which
                                          partitions need to be added.
--zookeeper <urls>                      REQUIRED: The connection string for
                                          the zookeeper connection in the form
                                          host:port. Multiple URLS can be
                                          given to allow fail-over.

...

replica assignment
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic topic1 --replica-assignment 0:1:2,0:1:2,0:1:2
 
# Create topic with configuration override
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic topic1 --config min.insync.replicas=1

2.3 Add Partition to Topic

What does the tool do?

In Kafka partitions act as the unit of parallelism: messages of a single topic are distributed to multiple partitions that can be stored and served on different servers. Upon creation of a topic, the number of partitions for this topic has to be specified. Later on more partitions may be needed for this topic when the volume of this topic increases. This tool helps to add more partitions for a specific topic and also allow manual replica assignment of the added partitions.

How to use the tool?

Code Block
# Increase number of partitions for topic
bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic topic1 --partitions 4
 
# Increase number of partitions with specific replica assignment
bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic topic1 --replica-assignment 0:1:2,0:1:2,0:1:2,2:1:0 --partitions 4

2.4 Delete Topic

What does the tool do?

When topic deletion is enabled in the broker (delete.topic.enable), topics can be deleted using the Kafka Topics tool.

How to use the tool?

Code Block
# Delete topic named topic1
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topic1

3. Reassign Partitions Tool

What does the tool do?

The goal of this tool is similar to the Referred Replica Leader Election Tool as to achieve load balance across brokers. But instead of only electing a new leader from the assigned replicas of a partition, this tool allows to change the assigned replicas of partitions – remember that followers also need to fetch from leaders in order to keep in sync, hence sometime only balance the leadership load is not enough.

...

This tool is only available in the 0.8 branch for now.

How to use the tool?

WARNING: The tool was released in beta in 0.8 and has some bugs that can render the topic unusable. The tool is known to be stable in 0.8.1.

...

Code Block
nnarkhed$ ./bin/kafka-reassign-partitions.sh --manual-assignment-json-file partitions-to-move.json --execute

nnarkhed$ cat partitions-to-move.json
{"partitions":
             [{"topic": "foo",
               "partition": 1,
               "replicas": [1,2,4] }],               
              }],
  "version":1
}

...

4. StateChangeLogMerger Tool

What does the tool do ?

The goal of this tool is to collect data from the brokers in a cluster and format it in a central log to help troubleshoot issues with state changes. Every broker in a Kafka cluster emits a state-change.log that logs the lifecycle of every state change received by the broker. Often times, there is some problem with leader election for a subset of topics/partitions and the question is what caused the problem. In order to answer this question, we need a global view of state changes in the kafka cluster, possibly filtered on a time range and/or specific topics/partitions. This is exactly what the StateChangeLogMerger tool does. It takes in a list of state-change.log files, merges them in time order, filters on a certain time range if specified by the user, filters on topics/partitions if specified by the user, and outputs a merged and formatted state-change.log that is easy to query and understand the root cause.

How to use the tool ?

Code Block
nnarkhed-mn:kafka-git-idea nnarkhed$ ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger
Provide arguments to exactly one of the two options "[logs]" or "[logs-regex]"
Option                                  Description
------                                  -----------
--end-time <end timestamp in the        The latest timestamp of state change
  format java.text.                       log entries to be merged (default:
  SimpleDateFormat@f17a63e7>              9999-12-31 23:59:59,999)
--logs <file1,file2,...>                Comma separated list of state change
                                          logs or a regex for the log file
                                          names
--logs-regex <for example: /tmp/state-  Regex to match the state change log
  change.log*>                            files to be merged
--partitions <0,1,2,...>                Comma separated list of partition ids
                                          whose state change logs should be
                                          merged
--start-time <start timestamp in the    The earliest timestamp of state change
  format java.text.                       log entries to be merged (default:
  SimpleDateFormat@f17a63e7>              0000-00-00 00:00:00,000)
--topic <topic>                         The topic whose state change logs
                                          should be merged