...
Code Block |
---|
{ "partitions": [ {"topic": "topic1", "partition": "0"}, {"topic": "topic1", "partition": "1"}, {"topic": "topic1", "partition": "2"}, {"topic": "topic2", "partition": "0"}, {"topic": "topic2", "partition": "1"} ] } |
FAQ
What happens if the preferred replica is not in the ISR?
...
Code Block |
---|
# Delete topic named topic1 bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topic1 |
3.
...
Change topic configuration
What does the tool do?
The goal of this tool is similar to the Referred Replica Leader Election Tool as to achieve load balance across brokers. But instead of only electing a new leader from the assigned replicas of a partition, this tool allows to change the assigned replicas of partitions – remember that followers also need to fetch from leaders in order to keep in sync, hence sometime only balance the leadership load is not enough.
A summary of the steps that the tool does is shown below -
Kafka Confings tool can be used to modify topic configuration:
- Add new config options
- Change existing config options
- Remove config options
How to use the tool?
Code Block |
---|
# Add new option or change exsiting option
bin/kafka-configs.sh --alter --zookeeper localhost:2181 --entity-name topic1 --entity-type topics --add-config cleanup.policy=compact
# Remove exsiting option
bin/kafka-configs.sh --alter --zookeeper localhost:2181 --entity-name topic1 --entity-type topics --delete-config cleanup.policy |
4. Reassign Partitions Tool
What does the tool do?
The goal of this tool is similar to the Preferred Replica Leader Election Tool as to achieve load balance across brokers. But instead of only electing a new leader from the assigned replicas of a partition, this tool allows to change the assigned replicas of partitions – remember that followers also need to fetch from leaders in order to keep in sync, hence sometime only balance the leadership load is not enough.
A summary of the steps that the tool does is shown below -
1. The tool updates the zookeeper path "/admin/reassign_partitions" with the list of topic partitions and (if specified in the Json file) the list of their new assigned replicas.
2. The controller listens to the path above. When a data change update is triggered, the controller reads the list of topic partitions and their assigned replicas from zookeeper.
3. For each topic partition, the controller does the following:
3.1. Start new replicas in RAR - AR (RAR = Reassigned Replicas, AR = original list of Assigned Replicas)
3.2. Wait until new replicas are in sync with the leader
3.3. If the leader is not in RAR1. The tool updates the zookeeper path "/admin/reassign_partitions" with the list of topic partitions and (if specified in the Json file) the list of their new assigned replicas.
2. The controller listens to the path above. When a data change update is triggered, the controller reads the list of topic partitions and their assigned replicas from zookeeper.
3. For each topic partition, the controller does the following:
3.1. Start new replicas in RAR - AR (RAR = Reassigned Replicas, AR = original list of Assigned Replicas)
3.2. Wait until new replicas are in sync with the leader
3.3. If the leader is not in RAR, elect a new leader from RAR
3.4 4. Stop old replicas AR - RAR
3.5. Write new AR
3.6. Remove partition from the /admin/reassign_partitions path
Note that the tool only updates the zookeeper path and exits. The controller reassign the replicas for the partitions asynchronously.
This tool is only available in the 0.8 branch for now.
...
How to use the tool?
...
Code Block |
---|
bin/kafka-reassign-partitions.sh
Option Description
------ -----------
--bootstrap-server <String: Server(s) the server(s) to use for
to use for bootstrapping> bootstrapping. REQUIRED if an
absolution path of the log directory
is specified for any replica in the
reassignment json file
--broker-list <String: brokerlist> The list of brokers to which the
partitions need to be reassigned in
the form "0,1,2". This is required
if --topics-to-move-json-file is
used to generate reassignment
configuration
--disable-rack-aware Disable rack aware replica assignment
--execute Kick off the reassignment as specified
by the --reassignment-json-file
option.
--generate Generate a candidate partition
reassignment configuration. Note
that this only generates a candidate
assignment, it does not execute it.
--reassignment-json-file <String: The JSON file with the partition
manual assignment json file path> reassignment configurationThe format
to use is -
{"partitions":
[{"topic": "foo",
"partition": 1,
"replicas": [1,2,3],
"log_dirs": ["dir1","dir2","dir3"]
}],
"version":1
}
Note that "log_dirs" is optional. When
it is specified, its length must
equal the length of the replicas
list. The value in this list can be
either "any" or the absolution path
of the log directory on the broker.
If absolute log directory path is
specified, it is currently required
that the replica has not already
been created on that broker. The
replica will then be created in the
specified log directory on the
broker later.
--throttle <Long: throttle> The movement of partitions will be
|
WARNING: The tool was released in beta in 0.8 and has some bugs that can render the topic unusable. The tool is known to be stable in 0.8.1.
Code Block |
---|
bin/kafka-reassign-partitions.sh Option throttled to this value (bytes/sec). Description ------ Rerunning with this option, ----------- --broker-list <brokerlist>whilst a The list of brokers to which the rebalance is in progress, will alter partitions need to be reassigned in the throttle value. The throttle the form "0,1,2". This is required rate should be at least 1 KB/s. for automatic topic reassignment. --execute [execute] (default: -1) --timeout <Long: timeout> This option does the actual The maximum time in ms allowed to wait reassignment. By default, the tool for partition reassignment execution does a dry run --manual-assignment-json-file <manual The JSON file with theto listbe ofsuccessfully manualinitiated assignment json file path> reassignmentsThis option or topics- (default: 10000) --topics-to-move-json-file <String: Generate a reassignment configuration topics to reassign json file path> to to-move-json-file needs to be move the partitions of the specified. Thetopics format to usethe islist -of {"partitions": brokers specified by the --broker- list option. [{"topic": "foo", The format to use is - {"partitiontopics": 1, "replicas": [1,2,3] [{"topic": "foo"},{"topic": "foo1"}], "version":1 } --topics-to-move-json-file <topics toverify The JSON file with the list of topics reassign json file path> Verify if to reassign.This option or manual- the reassignment completed as specified by the assignment--json-file needs to be reassignment- json-file option. If there is a specified. The format to use is - throttle engaged for the replicas {"topics": specified, and the rebalance has [{"topic": "foo"},{"topic": "foo1"}], completed, the throttle will "version":1be } removed --zookeeper <urls> <String: urls> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over. |
...
The partition reassignment tool can be used to expand an existing 0.8 Kafka cluster. Cluster expansion involves including brokers with new broker ids in a Kafka 08 cluster. Typically, when you add new brokers to a cluster, they will not receive any data from existing topics until this tool is run to assign existing topics/partitions to the new brokers. The tool allows 2 options to make it easier to move some topics in bulk to the new brokers. These 2 options are a) topics to move b) list of newly added brokers. Using these 2 options, the tool automatically figures out the placements of partitions for the topics on the new brokers and generates new JSON data which can be used in the next step (with the --reassignment-json-file
option) to execute the move.
The following example moves 2 topics (foo1, foo2) to newly added brokers in a cluster (5,6,7)
Code Block |
---|
nnarkhed$$ ./bin/kafka-reassign-partitions.sh --topics-to-move-json-file topics-to-move.json --broker-list "5,6,7" --executegenerate --zookeeper localhost:2181 nnarkhed$$ cat topics-to-move.json {"topics": [{"topic": "foo1"},{"topic": "foo2"}], "version":1 } |
...
The following example moves 1 partition (foo-1) from replicas 1,2,3 to 1,2,4
Code Block |
---|
nnarkhed$$ ./bin/kafka-reassign-partitions.sh --manualreassignment-assignment-json-file partitions-to-move.json --execute nnarkhed$$ cat partitions-to-move.json {"partitions": [{"topic": "foo", "partition": 1, "replicas": [1,2,4] }], }], "version":1 } |
...
5. StateChangeLogMerger Tool
What does the tool do ?
The goal of this tool is to collect data from the brokers in a cluster and format it in a central log to help troubleshoot issues with state changes. Every broker in a Kafka cluster emits a state-change.log that logs the lifecycle of every state change received by the broker. Often times, there is some problem with leader election for a subset of topics/partitions and the question is what caused the problem. In order to answer this question, we need a global view of state changes in the kafka cluster, possibly filtered on a time range and/or specific topics/partitions. This is exactly what the StateChangeLogMerger tool does. It takes in a list of state-change.log files, merges them in time order, filters on a certain time range if specified by the user, filters on topics/partitions if specified by the user, and outputs a merged and formatted state-change.log that is easy to query and understand the root cause.
How to use the tool ?
Code Block |
---|
nnarkhed-mn:kafka-git-idea nnarkhed$ ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger Provide arguments to exactly one of the two options "[logs]" or "[logs-regex]" Option Description ------ ----------- --end-time <end timestamp in the The latest timestamp of state change format java.text. log entries to be merged (default: SimpleDateFormat@f17a63e7> 9999-12-31 23:59:59,999) --logs <file1,file2,...> Comma separated list of state change logs or a regex for the log file names --logs-regex <for example: /tmp/state- Regex to match the state change log change.log*> files to be merged --partitions <0,1,2,...> Comma separated list of partition ids whose state change logs should be merged --start-time <start timestamp in the The earliest timestamp of state change format java.text. log entries to be merged (default: SimpleDateFormat@f17a63e7> 0000-00-00 00:00:00,000) --topic <topic> The topic whose state change logs should be merged |
...