Note this was initially erroneously assigned as KIP-178, which was already taken, and has been reassigned KIP-179.


Discussion thread: here (when initially misnumbered as KIP-178) and here (when assigned KIP-179)

JIRA: here

Firstly, the ReassignPartitionsCommand (which is used by the tool) talks directly to ZooKeeper. This prevents the tool being used in deployments where only the brokers are exposed to clients (i.e. where the zookeeper servers are intentionally not exposed). In addition, there is a general push to refactor/rewrite/replace tools which need ZooKeeper access with equivalents which use the AdminClient API. Thus it is necessary to change the ReassignPartitionsCommand so that it no longer talks to ZooKeeper directly, but via an intermediating broker. Similar work is needed for the tool (which can also change assignments and numbers of partitions and replicas), so common AdminClient and protocol APIs are desirable.

Secondly, ReassignPartitionsCommand currently has no proper facility to report progress of a reassignment; --verify can be used periodically to check whether the request assignments have been achieved. It would be useful if the tool could report progress better.

Public Interfaces

Two new network protocol APIs will be added:

The AdminClient API will have two new methods added (plus overloads for options):

The options accepted by command will change:

  • --zookeeper will be deprecated, with a warning message
  • a new --bootstrap-server option will be added
  • a new --progress action option will be added

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change. and ReassignPartitionsCommand

The --zookeeper option will be retained and will:

  1. Cause a deprecation warning to be printed to standard error. The message will say that the --zookeeper option will be removed in a future version and that --bootstrap-server is the replacement option.
  2. Perform the reassignment via ZooKeeper, as currently.

A new --bootstrap-server option will be added and will:

  1. Perform the reassignment via the given intermediating broker.

Using both --zookeeper and --bootstrap-server in the same command will produce an error message and the tool will exit without doing the intended operation.

It is anticipated that a future version of Kafka would remove support for the --zookeeper option.

A new --progress action option will be added. This will only be supported when used with --bootstrap-server. If used with --zookeeper the command will produce an error message and the tool will exit without doing the intended operation. --progress will report on the synchronisation of each of the partitions and brokers in the reassignment given via the --reassignment-json-file option

For example:

# If the following command is used to start a reassignment
bin/ --bootstrap-server localhost:9878 \
  --reassignment-json-file expand-cluster-reassignment.json \
# then the following command will print the progress of
# that reassignment, then exit immediately
bin/ --bootstrap-server localhost:9878 \
  --reassignment-json-file expand-cluster-reassignment.json \

That might print something like the following:

Topic      Partition  Broker  Status
my_topic   0          0       In sync
my_topic   0          1       Behind: 10456 messages behind
asdf       0          1       Unknown topic
my_topic   42         1       Unknown partition
my_topic   0          42      Unknown broker
my_topic   1          0       Broker does not host this partition

Internally, the ReassignPartitionsCommand will be refactored to support the above changes to the options. An interface will abstract the commands currently issued directly to zookeeper.

There will be an implementation which makes the current calls to ZooKeeper, and another implementation which uses the AdminClient API described below.

In all other respects, the public API of ReassignPartitionsCommand will not be changed.

AdminClient: alterTopics

The following methods will be added to AdminClient to support the ability to reassign partitions:

 * Request alteration of the given topics. The request can change the number of 
 * partitions, replication factor and/or the partition assignments. 
 * This can be a long running operation as replicas are migrated between brokers, 
 * therefore the returned result conveys whether the alteration has been 
 * started, not that it is complete. Progress information 
 * can be obtained by calling the lead broker's 
 * {@link #replicaStatus(Collection)}.
public AlterTopicsResult alterTopics(Collection<AlteredTopic> alteredTopics)
public AlterTopicsResult alterTopics(Collection<AlteredTopic> alteredTopics,  AssignPartitionOptions options)


public class AlteredTopic {
    public AlteredTopic(String name, int numPartitions, int replicationFactor, Map<Integer,List<Integer>> replicasAssignment) {
        // ...
    /** The name of the topic to alter. */
    public String name();
    /** The new number of partitions, or -1 if the number of partitions should not be changed. */
    public int numPartitions();
    /** The new replication factor, or -1 if the replication factor should not be changed. */
    public int replicationFactor();
     * The new assignments of partition to brokers, or the empty map 
     * if the broker should assign replicas automatically. 
    Map<Integer,List<Integer>> replicasAssignment();
public class AlterTopicsResult {
    // package-access constructor
    /** A mapping of the name of a requested topic to the error for that topic */
    KafkaFuture<Map<String, Errors>> result(); 
    /** The error for a give topic */
	KafkaFuture<Errors> topicResult(String topicName);

AdminClient: replicaStatus

The following methods will be added to AdminClient to support the progress reporting functionality:

 * Query the replication status of the given partitions of which this
 * broker is the leader.
public ReplicaStatusResult replicaStatus(Collection<TopicPartition> replicas)   
public ReplicaStatusResult replicaStatus(Collection<TopicPartition> replicas,  ReplicaStatusOptions options)


public class ReplicaStatusResult {
    public KafkaFuture<Map<TopicPartition, List<ReplicaStatus>>> all()
 * Represents the replication status of a partition 
 * on a particular broker.
public class ReplicaStatus {
    /** The topic about which this is the status of */
    String topic()
    /** The partition about which this is the status of */
    int gpartition()
    /** The broker about which this is the status of */
    int broker()
     * The time (as milliseconds since the epoch) that 
     * this status data was collected. In general this may
     * be some time before the replicaStatus() request time.
    public long statusTime()
     * The number of messages that the replica on this broker is behind
     * the leader.
    public long lag()


With broker-mediated reassignment it becomes possible limit the authority to perform reassignment to something finer-grained than "anyone with access to zookeeper".

The reasons for reassignment are usually operational. For example, migrating partitions to new brokers when expanding the cluster, or attempting to find a more balanced assignment (according to some notion of balance). These are cluster-wide considerations and so authority should be for the reassign operation being performed on the cluster. Therefore alterTopics() will require ClusterAction on the CLUSTER.

replicaStatus() will require Describe on the CLUSTER (or the TOPIC?).

Network Protocol: AlterTopicsRequest and AlterTopicsResponse

An AlterTopicsRequest will initiate the process of topic alteration/partition reassignment

AlterTopicsRequest => [AlteredTopic]
  AlteredTopic => Topic, Partitions, Replicas, PartitionAssignment
    Topic => string      ; the topic name
    Partitions => int32  ; the number of partitions
    Replicas => int32    ; the replication factor
    PartitionAssignment => Partition Brokers
      Partition =>int32  ; the partition
      Brokers => [int32] ; the ids of the assigned brokers for this partition

Partitions or Replicas of -1 that would mean "no change". An empty PartitionAssignment would mean that the broker should calculate a suitable assignment. Such broker calculated assignment is unlikely to be balanced.

It is not necessary to send an AlterTopicsRequest to the leader for a given partition. Any broker will do.

The AlterTopicsResponse enumerates those topics in the request, together with any error in initiating alteration:

AlterTopicsResponse => [AlteredTopic]
  AlteredTopic => Topic Error Detail
    Topic => string      ; the topic name
    Error => int32       ; the error code for altering this topic
    Detail => string     ; detailed error information

Possible Error Codes:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_PARTITIONS (37) If the Partitions was invalid
  • INVALID_REPLICATION_FACTOR (38) If the Replicas was invalid
  • UNKNOWN_MEMBER_ID (25) If any broker ids in the PartitionAssignment included an unknown broker id
  • INVALID_REPLICA_ASSIGNMENT (39) If a partition, replica or broker id in the PartitionAssignment doesn't exist or is incompatible with the requested Partitions and /or Replicas.
  • NONE (0) If the request was successful and the alteration/reassignment has been started.

As currently, it will not be possible to have multiple reassignments running concurrently, hence the addition of the PARTITION_REASSIGNMENT_IN_PROGRESS error code.

Network Protocol: ReplicaStatusRequest and ReplicaStatusResponse

ReplicaStatusRequest requests information about the progress of a number of replicas.

ReplicaStatusRequest => [Replica]
  Replica => Topic Partition Broker
    Topic => string     ; a topic name
    Partition => int32  ; a partition id of this topic
    Broker => int32     ; a follower broker id for this partition

The response includes replication information for each of the replicas in the request:

ReplicaStatusResponse => [ReplicaStatus]
  ReplicaStatus => Topic Partition Broker Error StatusTime, IsrTime, FollowerTime Lag MaxLag
    Topic => string         ; the topic name
    Partition => int32      ; the partition id
    Broker => int32         ; the follower broker id
    Error => int16          ; an error code
    StatusTime -> int64     ; the time the status was current
    Lag => int64            ; the lag (#messages)

Anticipated errors are:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed. (or the TOPIC?)
  • INVALID_TOPIC_EXCEPTION (17) The topic is not known
  • INVALID_PARTITIONS (37)  The Partion of the given topic is not valid
  • UNKNOWN_MEMBER_ID (25) The given Broker id is not known.
  • UNKNOWN_TOPIC_OR_PARTITION (3) The given Broker is not a follower for the partition identified by Topic, Partition.
  • NONE (0) if the status request completed normally,


The AdminClient.replicaStatus() has to be made (an the underlying ReplicaStatusRequest sent) to the leader for the given partition. This saves the need for every broker (because any broker could be the --bootstrap-server ) to have knowledge of the replication status of every replica, which would be inefficient in network IO and/or memory use. This means that, in general, the ReassignPartitionsCommand would need to discover other brokers in the cluster (via AdminClient.describeCluster()) and make a separate request to each broker implied by the reassignment in order to report progress.

Compatibility, Deprecation, and Migration Plan

Existing users of the will receive a deprecation warning when they use the --zookeeper option. The option will be removed in a future version of Kafka. If this KIP is introduced in version 1.0.0 the removal could happen in 2.0.0.

Rejected Alternatives

One alternative is to do nothing: Let the ReassignPartitionsCommand continue to communicate with ZooKeeper directly.

Another alternative is to do exactly this KIP, but without the deprecation of --zookeeper. That would have a higher long term maintenance burden, and would prevent any future plans to, for example, provide alternative cluster technologies than ZooKeeper.


