Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Minor

...

Two new network protocol APIs will be added:

The AdminClient API will have two new methods added (plus overloads for options):

The options accepted by kafka-reassign-partitions.sh command will change:

...

A new --progress action option will be added. This will only be supported when used with --bootstrap-server. If used with --zookeeper the command will produce an error message and the tool will exit without doing the intended operation. --progress will report on the synchronisation of each of the partitions and brokers in the reassignment given via the --reassignment-json-file option

For example:

Code Blocknoformat
# If the following command is used to start a reassignment
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
  --reassignment-json-file expand-cluster-reassignment.json \
  --execute
       
# then the following command will print the progress of
# that reassignment, then exit immediately
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
  --reassignment-json-file expand-cluster-reassignment.json \
  --progress

...

In all other respects, the public API of ReassignPartitionsCommand will not be changed.

AdminClient: alterTopics

Anchor
alterTopics
alterTopics
The following methods will be added to AdminClient to support the ability to reassign partitions:

...

Code Block
public class AlteredTopic {
    public AlteredTopic(String name, int numPartitions, int replicationFactor, Map<Integer,List<Integer>> replicasAssignment) {
        // ...
    }
    /** The name of the topic to alter. */
    public String name();
    /** The new number of partitions, or -1 if the number of partitions should not be changed. */
    public int numPartitions();
    /** The new replication factor, or -1 if the replication factor should not be changed. */
    public int replicationFactor();
    /** 
     * The new assignments of partition to brokers, or the empty map 
     * if the broker should assign replicas automatically. 
     */
    Map<Integer,List<Integer>> replicasAssignment();
}
public class AlterTopicsResult {
    // package-access constructor
    /** A mapping of the name of a requested topic to the error for that topic */
    KafkaFuture<Map<String, Errors>> result(); 
    /** The error for a give topic */
	KafkaFuture<Errors> topicResult(String topicName);
}

AdminClient: replicaStatus

Anchor
replicaStatus
replicaStatus
The following methods will be added to AdminClient to support the progress reporting functionality:

...

The reasons for reassignment are usually operational. For example, migrating partitions to new brokers when expanding the cluster, or attempting to find a more balanced assignment (according to some notion of balance). These are cluster-wide considerations and so authority should be for the reassign operation being performed on the cluster. Therefore alterTopics() will require ClusterAction on the CLUSTER.

replicaStatus() will require Describe on the CLUSTER (or the TOPIC?).

Network Protocol: AlterTopicsRequest and AlterTopicsResponse

Anchor
AlterTopicsRequest
AlterTopicsRequest
An AlterTopicsRequest will initiate the process of topic alteration/partition reassignment

...

It is not necessary to send an AlterTopicsRequest to the leader for a given partition. Any broker will do.

Anchor
AlterTopicsResponse
AlterTopicsResponse
The AlterTopicsResponse enumerates those topics in the request, together with any error in initiating alteration:

Code Block
AlterTopicsResponse => [AlteredTopic]
  AlteredTopic => Topic Error Detail
    Topic => string      ; the topic name
    Error => int32       ; the error code for altering this topic
    Detail => string     ; detailed error information

Possible Error Codes:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_PARTITIONS (37) If the Partitions was invalid
  • INVALID_REPLICATION_FACTOR (38) If the Replicas was invalid
  • UNKNOWN_MEMBER_ID (25) If any broker ids in the PartitionAssignment included an unknown broker id
  • PARTITION_REASSIGNMENT_IN_PROGRESS (new)
  • INVALID_REPLICA_ASSIGNMENT (39) If a partition, replica or broker id in the PartitionAssignment doesn't exist or is incompatible with the requested Partitions and /or Replicas.
  • NONE (0) If the request was successful and the alteration/reassignment has been started.

...

Network Protocol: ReplicaStatusRequest and ReplicaStatusResponse

Anchor
ReplicaStatusRequest
ReplicaStatusRequest
ReplicaStatusRequest requests information about the progress of a number of replicas.

Code Block
ReplicaStatusRequest => [Replica]
  Replica => Topic Partition Broker
    Topic => string     ; a topic name
    Partition => int32  ; a partition id of this topic
    Broker => int32     ; a follower broker id for this partition

Anchor
ReplicaStatusResponse
ReplicaStatusResponse
The response includes replication information for each of the replicas in the request:

Code Block
ReplicaStatusResponse => [ReplicaStatus]
  ReplicaStatus => Topic Partition Broker Error StatusTime, IsrTime, FollowerTime Lag MaxLag
    Topic => string         ; the topic name
    Partition => int32      ; the partition id
    Broker => int32         ; the follower broker id
    Error => int16          ; an error code
    StatusTime -> int64     ; the time the status was current
    Lag => int64            ; the lag (#messages)

Anticipated errors are:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed. (or the TOPIC?)
  • INVALID_TOPIC_EXCEPTION (17) The topic is not known
  • INVALID_PARTITIONS (37)  The Partion of the given topic is not valid
  • UNKNOWN_MEMBER_ID (25) The given Broker id is not known.
  • UNKNOWN_TOPIC_OR_PARTITION (3) The given Broker is not a follower for the partition identified by Topic, Partition.
  • NONE (0) if the status request completed normally,

...