...
Code Block |
---|
{ "partitions": [ {"topic": "topic1", "partition": "0"}, {"topic": "topic1", "partition": "1"}, {"topic": "topic1", "partition": "2"}, {"topic": "topic2", "partition": "0"}, {"topic": "topic2", "partition": "1"}, ] } |
FAQ
What happens if the preferred replica is not in the ISR?
...
ListTopicCommand is an excellent tool that provides an overview of all the topic partitions in the cluster. For each topic partition, it displays the leader, assigned replicas and current "in-sync" replica set. If the leader and the first replica in the assigned replica set are the same then the Preferred replica leader election" tool succeeded. If not, the tool failed and may have to be run again.
...
2. Topics tool
Kafka topics tool is handling all management operations related to topics:
- List and describe topics
- Create topics
- Change topics
- Delete topics
2.1 List and describe Topics
What does the tool do?
This tool lists the information for a given list of topics. If no topics are provided in the command line, the tool queries zookeeper to get all the topics and lists the information for them. The fields that the tool displays are - topic name, partition, leader, replicas, isr. Two optional arguments can be provided to the tool. If "under-replicated-partitions" is specified, the tool only provides information for those topic / partitions which have replicas that are under replicated. If "unavailable-partitions" is specified, the tool only provides information for those topic/partitions whose leader is not available.
How to use the tool?
Code Block |
---|
# List only single infotopic fornamed "topic1" (prints only topic name) bin/kafka-list-topictopics.sh --list --zookeeper localhost:2121 --topic topic1 # List info for all topics all topics (prints only topic names) bin/kafka-list-topictopics.sh --list --zookeeper localhost:2121 List info for topics which have under replicated count# Describe only single topic named "topic1" (prints details about the topic) bin/kafka-list-topictopics.sh --describe --zookeeper localhost:2121 --under-replicated-partitionstopic topic1 List# infoDescribe forall topics whose(prints leaderdetails forabout a partition is not availablethe topics) bin/kafka-topics.sh --describe --zookeeper localhost:2121 # List info for topics which have under replicated count bin/kafka-list-topictopics.sh --describe --zookeeper localhost:2121 --unavailableunder-replicated-partitions |
4. Create Topic Tool
What does the tool do?
# List info for topics whose leader for a partition is not available
bin/kafka-topics.sh --describe --zookeeper localhost:2121 --unavailable-partitions
|
2.2 Create Topics
What does the tool do?
By default, Kafka auto creates topic if "auto.create.topics.enable" is set to true on the server. This creates a topic with a default number of partitions, replication factor and uses Kafka's default scheme to do replica assignment. Sometimes, it may be required that we would like to customize a topic while creating it. This tool helps to create a topic and also specify the number of partitions, replication factor and replica assignment list for the topic.
How to use the tool?
Code Block |
---|
# Create topic with default settings bin/kafka-create-topictopics.sh Option --create --zookeeper localhost:2181 --topic topic1 # Create topic with specific number of partitions and/or replicas bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic topic1 --replication-factor 3 --partitions 3 # Create topic with manual replica assignment bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic topic1 --replica-assignment 0:1:2,0:1:2,0:1:2 # Create topic with configuration override bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic topic1 --config min.insync.replicas=1 |
2.3 Add Partition to Topic
What does the tool do?
In Kafka partitions act as the unit of parallelism: messages of a single topic are distributed to multiple partitions that can be stored and served on different servers. Upon creation of a topic, the number of partitions for this topic has to be specified. Later on more partitions may be needed for this topic when the volume of this topic increases. This tool helps to add more partitions for a specific topic and also allow manual replica assignment of the added partitions.
How to use the tool?
Code Block |
---|
# Increase number of partitions for topic
bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic topic1 --partitions 4
# Increase number of partitions with specific replica assignment
bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic topic1 --replica-assignment 0:1:2,0:1:2,0:1:2,2:1:0 --partitions 4 |
2.4 Delete Topic
What does the tool do?
When topic deletion is enabled in the broker (delete.topic.enable
), topics can be deleted using the Kafka Topics tool.
How to use the tool?
Code Block |
---|
# Delete topic named topic1
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topic1 |
3. Change topic configuration
What does the tool do?
Kafka Confings tool can be used to modify topic configuration:
- Add new config options
- Change existing config options
- Remove config options
How to use the tool?
Code Block |
---|
# Add new option or change exsiting option
bin/kafka-configs.sh --alter --zookeeper localhost:2181 --entity-name topic1 --entity-type topics --add-config cleanup.policy=compact
# Remove exsiting option
bin/kafka-configs.sh --alter --zookeeper localhost:2181 --entity-name topic1 --entity-type topics --delete-config cleanup.policy |
4. Reassign Partitions Tool
What does the tool do?
The goal of this tool is similar to the Preferred Replica Leader Election Tool as to achieve load balance across brokers. But instead of only electing a new leader from the assigned replicas of a partition, this tool allows to change the assigned replicas of partitions – remember that followers also need to fetch from leaders in order to keep in sync, hence sometime only balance the leadership load is not enough.
A summary of the steps that the tool does is shown below -
1. The tool updates the zookeeper path "/admin/reassign_partitions" with the list of topic partitions and (if specified in the Json file) the list of their new assigned replicas.
2. The controller listens to the path above. When a data change update is triggered, the controller reads the list of topic partitions and their assigned replicas from zookeeper.
3. For each topic partition, the controller does the following:
3.1. Start new replicas in RAR - AR (RAR = Reassigned Replicas, AR = original list of Assigned Replicas)
3.2. Wait until new replicas are in sync with the leader
3.3. If the leader is not in RAR, elect a new leader from RAR
3.4 4. Stop old replicas AR - RAR
3.5. Write new AR
3.6. Remove partition from the /admin/reassign_partitions path
Note that the tool only updates the zookeeper path and exits. The controller reassign the replicas for the partitions asynchronously.
How to use the tool?
Code Block |
---|
bin/kafka-reassign-partitions.sh Option Description ------ ----------- --bootstrap-server <String: Server(s) the server(s) to use for to use for bootstrapping> bootstrapping. REQUIRED if an absolution path of the log directory is specified for any replica in the reassignment json file --broker-list <String: brokerlist> The list of brokers to which the partitions need to be reassigned in the form "0,1,2". This is required Description ------ ----------- --partition <Integer: # of partitions> number of partitions in the topic (default: 1) --replica <Integer: replication factor> replication factor for each partitions in the topic (default: 1) --replica-assignment-list for manually assigning replicas to brokers (default: ) if --topics-to-move-json-file is <broker_id_for_part1_replica1 :used to generate reassignment broker_id_for_part1_replica2, configuration --disable-rack-aware Disable rack aware replica assignment --execute Kick off the reassignment as specified broker_id_for_part2_replica1 : by the --reassignment-json-file broker_id_for_part2_replica2, ...> --topic <topic> REQUIRED: The topic to be createdoption. --zookeepergenerate <urls> REQUIRED: The connection string for Generate a candidate partition the zookeeper connection inreassignment theconfiguration. formNote that host:port. Multiple URLS can be this only generates a candidate given to allow fail-over. |
5. Add Partition Tool
What does the tool do?
In Kafka partitions act as the unit of parallelism: messages of a single topic are distributed to multiple partitions that can be stored and served on different servers. Upon creation of a topic, the number of partitions for this topic has to be specified. Later on more partitions may be needed for this topic when the volume of this topic increases. This tool helps to add more partitions for a specific topic and also allow manual replica assignment of the added partitions. This tool is only available in the 0.8 branch for now.
How to use the tool?
Code Block |
---|
bin/kafka-add-partitions.sh Option assignment, it does not execute it. --reassignment-json-file <String: The JSON file with the partition manual assignment json file path> reassignment configurationThe format to use is - {"partitions": [{"topic": "foo", Description ------ ----------- --partition <Integer: # of partitions> REQUIRED: Number of partitions to add "partition": 1, "replicas": [1,2,3], to the topic --replica-assignment-list For manually assigning replicas to <broker_id_for_part1_replica1 : "log_dirs": ["dir1","dir2","dir3"] brokers for the new partitions broker_id_for_part1_replica2, (default: ) broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...> --topic <topic> }], REQUIRED: The topic for which "version":1 partitions need to be added. --zookeeper <urls> } REQUIRED: The connection string for Note that "log_dirs" is optional. When the zookeeper connection in the form it is specified, its length must host:port. Multiple URLS can be equal the length of the replicas given to allow fail-over. |
6. Reassign Partitions Tool
What does the tool do?
The goal of this tool is similar to the Referred Replica Leader Election Tool as to achieve load balance across brokers. But instead of only electing a new leader from the assigned replicas of a partition, this tool allows to change the assigned replicas of partitions – remember that followers also need to fetch from leaders in order to keep in sync, hence sometime only balance the leadership load is not enough.
A summary of the steps that the tool does is shown below -
1. The tool updates the zookeeper path "/admin/reassign_partitions" with the list of topic partitions and (if specified in the Json file) the list of their new assigned replicas.
2. The controller listens to the path above. When a data change update is triggered, the controller reads the list of topic partitions and their assigned replicas from zookeeper.
3. For each topic partition, the controller does the following:
3.1. Start new replicas in RAR - AR (RAR = Reassigned Replicas, AR = original list of Assigned Replicas)
3.2. Wait until new replicas are in sync with the leader
3.3. If the leader is not in RAR, elect a new leader from RAR
3.4 4. Stop old replicas AR - RAR
3.5. Write new AR
3.6. Remove partition from the /admin/reassign_partitions path
Note that the tool only updates the zookeeper path and exits. The controller reassign the replicas for the partitions asynchronously.
This tool is only available in the 0.8 branch for now.
How to use the tool?
WARNING: The tool was released in beta in 0.8 and has some bugs that can render the topic unusable. The tool is known to be stable in 0.8.1.
Code Block |
---|
bin/kafka-reassign-partitions.sh Option list. The value in this list can be either "any" or the absolution path of the log directory on the broker. If absolute log directory path is specified, it is currently required that the replica has not already been created on that broker. The replica will then be created in the specified log directory on the broker later. --throttle <Long: throttle> The movement of partitions will be throttled to this Description ------value (bytes/sec). Rerunning with ----------- --broker-list <brokerlist>this option, whilst a The list of brokers to which the rebalance is in progress, will alter partitions need to be reassigned in the throttle value. The throttle the form "0,1,2". This is required rate should be at least 1 KB/s. for automatic topic reassignment. --execute [execute] (default: -1) --timeout <Long: timeout> This option does the actual The maximum time in ms allowed to wait reassignment. By default, the tool for partition reassignment execution does a dry run --manual-assignment-json-file <manual The JSON file with the list of manual assignment json file path> to be successfully initiated reassignmentsThis option or topics- (default: 10000) --topics-to-move-json-file <String: Generate a reassignment configuration topics to reassign json file path> to move the partitions of the to-move-json-file needs to be specified topics to the list of specified. The format to use is - brokers specified by the --broker- {"partitions": list option. [{"topic": "foo", The format to use is - {"partitiontopics": 1, "replicas": [1,2,3] [{"topic": "foo"},{"topic": "foo1"}], "version":1 } --topics-to-move-json-file <topics toverify The JSON file with the list of topics reassign json file path> Verify if the to reassign.This option or manual- reassignment completed as specified by the assignment--json-file needs to be reassignment- json-file option. If there is a specified. The format to use is - throttle engaged for the replicas {"topics": specified, and the rebalance has [{"topic": "foo"},{"topic": "foo1"}], completed, the throttle will "version":1 be }removed --zookeeper <urls> <String: urls> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over. |
...
The partition reassignment tool can be used to expand an existing 0.8 Kafka cluster. Cluster expansion involves including brokers with new broker ids in a Kafka 08 cluster. Typically, when you add new brokers to a cluster, they will not receive any data from existing topics until this tool is run to assign existing topics/partitions to the new brokers. The tool allows 2 options to make it easier to move some topics in bulk to the new brokers. These 2 options are a) topics to move b) list of newly added brokers. Using these 2 options, the tool automatically figures out the placements of partitions for the topics on the new brokers and generates new JSON data which can be used in the next step (with the --reassignment-json-file
option) to execute the move.
The following example moves 2 topics (foo1, foo2) to newly added brokers in a cluster (5,6,7)
Code Block |
---|
nnarkhed$$ ./bin/kafka-reassign-partitions.sh --topics-to-move-json-file topics-to-move.json --broker-list "5,6,7" --generate --executezookeeper localhost:2181 nnarkhed$$ cat topics-to-move.json {"topics": [{"topic": "foo1"},{"topic": "foo2"}], "version":1 } |
...
The following example moves 1 partition (foo-1) from replicas 1,2,3 to 1,2,4
Code Block |
---|
nnarkhed$$ ./bin/kafka-reassign-partitions.sh --manualreassignment-assignment-json-file partitions-to-move.json --execute nnarkhed$$ cat partitions-to-move.json {"partitions": [{"topic": "foo", "partition": 1, "replicas": [1,2,4] }], }], "version":1 } |
...
5. StateChangeLogMerger Tool
What does the tool do ?
The goal of this tool is to collect data from the brokers in a cluster and format it in a central log to help troubleshoot issues with state changes. Every broker in a Kafka cluster emits a state-change.log that logs the lifecycle of every state change received by the broker. Often times, there is some problem with leader election for a subset of topics/partitions and the question is what caused the problem. In order to answer this question, we need a global view of state changes in the kafka cluster, possibly filtered on a time range and/or specific topics/partitions. This is exactly what the StateChangeLogMerger tool does. It takes in a list of state-change.log files, merges them in time order, filters on a certain time range if specified by the user, filters on topics/partitions if specified by the user, and outputs a merged and formatted state-change.log that is easy to query and understand the root cause.
How to use the tool ?
Code Block |
---|
nnarkhed-mn:kafka-git-idea nnarkhed$ ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger Provide arguments to exactly one of the two options "[logs]" or "[logs-regex]" Option Description ------ ----------- --end-time <end timestamp in the The latest timestamp of state change format java.text. log entries to be merged (default: SimpleDateFormat@f17a63e7> 9999-12-31 23:59:59,999) --logs <file1,file2,...> Comma separated list of state change logs or a regex for the log file names --logs-regex <for example: /tmp/state- Regex to match the state change log change.log*> files to be merged --partitions <0,1,2,...> Comma separated list of partition ids whose state change logs should be merged --start-time <start timestamp in the The earliest timestamp of state change format java.text. log entries to be merged (default: SimpleDateFormat@f17a63e7> 0000-00-00 00:00:00,000) --topic <topic> The topic whose state change logs should be merged |
...