Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
{
 "partitions":
  [
    {"topic": "topic1", "partition": "0"},
    {"topic": "topic1", "partition": "1"},
    {"topic": "topic1", "partition": "2"},
    {"topic": "topic2", "partition": "0"},
    {"topic": "topic2", "partition": "1"}
  ]
}

FAQ

What happens if the preferred replica is not in the ISR?

...

Code Block
# Delete topic named topic1
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topic1

3.

...

Change topic configuration

What does the tool do?

The goal of this tool is similar to the Referred Replica Leader Election Tool as to achieve load balance across brokers. But instead of only electing a new leader from the assigned replicas of a partition, this tool allows to change the assigned replicas of partitions – remember that followers also need to fetch from leaders in order to keep in sync, hence sometime only balance the leadership load is not enough.

A summary of the steps that the tool does is shown below -

Kafka Confings tool can be used to modify topic configuration:

  • Add new config options
  • Change existing config options
  • Remove config options

How to use the tool?

Code Block
# Add new option or change exsiting option
bin/kafka-configs.sh --alter --zookeeper localhost:2181 --entity-name topic1 --entity-type topics --add-config cleanup.policy=compact


# Remove exsiting option
bin/kafka-configs.sh --alter --zookeeper localhost:2181 --entity-name topic1 --entity-type topics --delete-config cleanup.policy

4. Reassign Partitions Tool

What does the tool do?

The goal of this tool is similar to the Preferred Replica Leader Election Tool as to achieve load balance across brokers. But instead of only electing a new leader from the assigned replicas of a partition, this tool allows to change the assigned replicas of partitions – remember that followers also need to fetch from leaders in order to keep in sync, hence sometime only balance the leadership load is not enough.

A summary of the steps that the tool does is shown below -

1. The tool updates the zookeeper path "/admin/reassign_partitions" with the list of topic partitions and (if specified in the Json file) the list of their new assigned replicas.
2. The controller listens to the path above. When a data change update is triggered, the controller reads the list of topic partitions and their assigned replicas from zookeeper.
3. For each topic partition, the controller does the following:
3.1. Start new replicas in RAR - AR (RAR = Reassigned Replicas, AR = original list of Assigned Replicas)
3.2. Wait until new replicas are in sync with the leader
3.3. If the leader is not in RAR1. The tool updates the zookeeper path "/admin/reassign_partitions" with the list of topic partitions and (if specified in the Json file) the list of their new assigned replicas.
2. The controller listens to the path above. When a data change update is triggered, the controller reads the list of topic partitions and their assigned replicas from zookeeper.
3. For each topic partition, the controller does the following:
3.1. Start new replicas in RAR - AR (RAR = Reassigned Replicas, AR = original list of Assigned Replicas)
3.2. Wait until new replicas are in sync with the leader
3.3. If the leader is not in RAR, elect a new leader from RAR
3.4 4. Stop old replicas AR - RAR
3.5. Write new AR
3.6. Remove partition from the /admin/reassign_partitions path

Note that the tool only updates the zookeeper path and exits. The controller reassign the replicas for the partitions asynchronously.

This tool is only available in the 0.8 branch for now.

...

How to use the tool?

...

Code Block
bin/kafka-reassign-partitions.sh

Option                                 Description
------                                 -----------
--bootstrap-server <String: Server(s)  the server(s) to use for
  to use for bootstrapping>              bootstrapping. REQUIRED if an
                                         absolution path of the log directory
                                         is specified for any replica in the
                                         reassignment json file
--broker-list <String: brokerlist>     The list of brokers to which the
                                         partitions need to be reassigned in
                                         the form "0,1,2". This is required
                                         if --topics-to-move-json-file is
                                         used to generate reassignment
                                         configuration
--disable-rack-aware                   Disable rack aware replica assignment
--execute                              Kick off the reassignment as specified
                                         by the --reassignment-json-file
                                         option.
--generate                             Generate a candidate partition
                                         reassignment configuration. Note
                                         that this only generates a candidate
                                         assignment, it does not execute it.
--reassignment-json-file <String:      The JSON file with the partition
  manual assignment json file path>      reassignment configurationThe format
                                         to use is -
                                       {"partitions":
                                       	[{"topic": "foo",
                                       	  "partition": 1,
                                       	  "replicas": [1,2,3],
                                       	  "log_dirs": ["dir1","dir2","dir3"]
                                         }],
                                       "version":1
                                       }
                                       Note that "log_dirs" is optional. When
                                         it is specified, its length must
                                         equal the length of the replicas
                                         list. The value in this list can be
                                         either "any" or the absolution path
                                         of the log directory on the broker.
                                         If absolute log directory path is
                                         specified, it is currently required
                                         that the replica has not already
                                         been created on that broker. The
                                         replica will then be created in the
                                         specified log directory on the
                                         broker later.
--throttle <Long: throttle>            The movement of partitions will be
                   

WARNING: The tool was released in beta in 0.8 and has some bugs that can render the topic unusable. The tool is known to be stable in 0.8.1.

Code Block
bin/kafka-reassign-partitions.sh

Option                      throttled to this value (bytes/sec).
          Description
------                               Rerunning with this option,  -----------
--broker-list <brokerlist>whilst a
                  The list of brokers to which the
                 rebalance is in progress, will alter
                    partitions need to be reassigned in
                the throttle value. The throttle
                      the form "0,1,2". This is required
              rate should be at least 1 KB/s.
                      for automatic topic reassignment.
--execute [execute]               (default: -1)
--timeout <Long: timeout>   This option does the actual
       The maximum time in ms allowed to wait
                            reassignment. By default, the tool
         for partition reassignment execution
                              does a dry run
--manual-assignment-json-file <manual   The JSON file with theto listbe ofsuccessfully manualinitiated
  assignment json file path>              reassignmentsThis option or topics-
                   (default: 10000)
--topics-to-move-json-file <String:    Generate a reassignment configuration
  topics to reassign json file path>     to  to-move-json-file needs to be
 move the partitions of the
                                         specified. Thetopics format to usethe islist -of
                                          {"partitions":
brokers specified by the --broker-
                                         list option.  	[{"topic": "foo",
   The format to use is -
                                     	  {"partitiontopics": 1,
                                        	  "replicas": [1,2,3] [{"topic": "foo"},{"topic": "foo1"}],
                                        "version":1
                                        }
--topics-to-move-json-file <topics toverify     The JSON file with the list of topics
  reassign json file path>              Verify if to reassign.This option or manual-
the reassignment completed
                                         as specified by the assignment--json-file needs to be
reassignment-
                                         json-file option. If there is a
       specified. The format to use is -
                            throttle engaged for the replicas
        {"topics":
                                 specified, and the rebalance has
   	[{"topic": "foo"},{"topic": "foo1"}],
                                    completed, the throttle will "version":1be
                                        } removed
--zookeeper <urls>        <String: urls>              REQUIRED: The connection string for
                                          the zookeeper connection in the form
                                          host:port. Multiple URLS can be
                                          given to allow fail-over.

...

The partition reassignment tool can be used to expand an existing 0.8 Kafka cluster. Cluster expansion involves including brokers with new broker ids in a Kafka 08 cluster. Typically, when you add new brokers to a cluster, they will not receive any data from existing topics until this tool is run to assign existing topics/partitions to the new brokers. The tool allows 2 options to make it easier to move some topics in bulk to the new brokers. These 2 options are a) topics to move b) list of newly added brokers. Using these 2 options, the tool automatically figures out the placements of partitions for the topics on the new brokers and generates new JSON data which can be used in the next step (with the --reassignment-json-file option) to execute the move.

The following example moves 2 topics (foo1, foo2) to newly added brokers in a cluster (5,6,7)

Code Block
nnarkhed$$ ./bin/kafka-reassign-partitions.sh --topics-to-move-json-file topics-to-move.json --broker-list "5,6,7" --executegenerate --zookeeper localhost:2181

nnarkhed$$ cat topics-to-move.json
{"topics":
     [{"topic": "foo1"},{"topic": "foo2"}],
     "version":1
}

...

The following example moves 1 partition (foo-1) from replicas 1,2,3 to 1,2,4

Code Block
nnarkhed$$ ./bin/kafka-reassign-partitions.sh --manualreassignment-assignment-json-file partitions-to-move.json --execute

nnarkhed$$ cat partitions-to-move.json
{"partitions":
             [{"topic": "foo",
               "partition": 1,
               "replicas": [1,2,4] }],               
              }],
  "version":1
}

...

5. StateChangeLogMerger Tool

What does the tool do ?

The goal of this tool is to collect data from the brokers in a cluster and format it in a central log to help troubleshoot issues with state changes. Every broker in a Kafka cluster emits a state-change.log that logs the lifecycle of every state change received by the broker. Often times, there is some problem with leader election for a subset of topics/partitions and the question is what caused the problem. In order to answer this question, we need a global view of state changes in the kafka cluster, possibly filtered on a time range and/or specific topics/partitions. This is exactly what the StateChangeLogMerger tool does. It takes in a list of state-change.log files, merges them in time order, filters on a certain time range if specified by the user, filters on topics/partitions if specified by the user, and outputs a merged and formatted state-change.log that is easy to query and understand the root cause.

How to use the tool ?

Code Block
nnarkhed-mn:kafka-git-idea nnarkhed$ ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger
Provide arguments to exactly one of the two options "[logs]" or "[logs-regex]"
Option                                  Description
------                                  -----------
--end-time <end timestamp in the        The latest timestamp of state change
  format java.text.                       log entries to be merged (default:
  SimpleDateFormat@f17a63e7>              9999-12-31 23:59:59,999)
--logs <file1,file2,...>                Comma separated list of state change
                                          logs or a regex for the log file
                                          names
--logs-regex <for example: /tmp/state-  Regex to match the state change log
  change.log*>                            files to be merged
--partitions <0,1,2,...>                Comma separated list of partition ids
                                          whose state change logs should be
                                          merged
--start-time <start timestamp in the    The earliest timestamp of state change
  format java.text.                       log entries to be merged (default:
  SimpleDateFormat@f17a63e7>              0000-00-00 00:00:00,000)
--topic <topic>                         The topic whose state change logs
                                          should be merged

...