Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: AlterTopics and ReplicaStatus

 

Note this was initially erroneously assigned as KIP-178, which was already taken, and has been reassigned KIP-179.

...

Firstly, the ReassignPartitionsCommand (which is used by the kafka-reassign-partitions.sh tool) talks directly to ZooKeeper. This prevents the tool being used in deployments where only the brokers are exposed to clients (i.e. where the zookeeper servers are intentionally not exposed). In addition, there is a general push to refactor/rewrite/replace tools which need ZooKeeper access with equivalents which use the the AdminClient API. Thus it is necessary to change the ReassignPartitionsCommand so that it no longer talks to ZooKeeper directly, but via an intermediating broker. Similar work is needed for the kafka-topics.sh tool (which can also change assignments and numbers of partitions and replicas), so common AdminClient and protocol APIs are desirable.

Secondly, ReassignPartitionsCommand currently has no proper facility to report progress of a reassignment; --verify can be used periodically to check whether the request assignments have been achieved. It would be useful if the tool could report progress better.

...

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

...

Two new network protocol APIs will be added:

  • PartitionAssignmentRequest AlterTopicsRequest and PartitionAssignmentResponse
  • PartitionAssignmentStatusRequest and PartitionAssignmentStatusResponse
  • AlterTopicsResponse
  • ReplicaStatusRequest and ReplicaStatusResponse

The The AdminClient API will have two new methods added (plus overloads for options):

  • assignPartitionsalterTopics(Set<AlterTopics>)
  • partitionAssignmentStatusreplicaStatus(Collection<Replica> replicas)

The options accepted by kafka-reassign-partitions.sh command will change:

  • --zookeeper will be deprecated, with a warning message
  • a new --bootstrap-server option will be added
  • a new --progress action option will be added, with a new --request option

Reassignment requests will be identified by a monotonic integer (the "request id"), which will be apparent in each of these public interfaces. The intention is to avoid ambiguity in the event that one reassignment is immediately followed by another. Note that this KIP will not add support for concurrent reassignments, but having a request id is compatible with disambiguating concurrent reassignments, should support for them be added at a later date.

Proposed Changes

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

kafka-reassign-partitions.sh and ReassignPartitionsCommand

...

It is anticipated that a future version of Kafka would remove support for the --zookeeper option.

New A new --progress and --request options action option will be added. These This will only be supported when used with --bootstrap-server. If used with --zookeeper the command will produce an error message and the tool will exit without doing the intended operation. The existing  --execute option progress will cause the request id to be printed to the standard output. This can be used as an argument to the --request option to identify the request that the --progress pertains to. If --progress is used without a --request option present the most recent request will be used. If an invalid --request option is given the tool will exit with an error message. For example:report on the synchronisation of each of the partitions and brokers in the reassignment given via the --reassignment-json-file option

For example:

Code Block
# If the following command is used to start a reassignment
bin/
Code Block
# Start the execution of a reassignment and exit immediately,
# printing a reassignment request id to standard output
kafka-reassign-partitions.sh --bootstrap-server localhost:12349878 \
  --reassignment-json-executefile ...expand-cluster-reassignment.json \
  --execute
       
# Assumingthen the abovefollowing command printed 42, the following
# will print the progress of
# that reassignment, then exit immediately
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:12349878 \
  --reassignment-json-progressfile expand-cluster-request 42 ...

Internally, the ReassignPartitionsCommand will be refactored to support the above changes to the options. An interface will abstract the commands currently issued directly to zookeeper.

There will be an implementation which makes the current calls to ZooKeeper, and another implementation which uses the AdminClient API described below.

In all other respects, the public API of ReassignPartitionsCommand will not be changed.

AdminClient

The following methods will be added to AdminClient to support the ability to reassign partitions:

Code Block
public AssignPartitionsResult assignPartitions(Map<TopicPartition, List<Integer>>)
public AssignPartitionsResult assignPartitions(Map<TopicPartition, List<Integer>>,  AssignPartitionOptions options)

Where:

reassignment.json \
  --progress

That might print something like the following:

No Format
Topic      Partition  Broker  Status
-------------------------------------
my_topic   0          0       In sync
my_topic   0          1
Code Block
  public class AssignPartitionsResult {
        // private constructor
        publicBehind: Map<TopicPartition,10456 KafkaFuture<Void>>messages values()
behind
asdf       0     public KafkaFuture<Void> all()
  }

The following methods will be added to AdminClient to support the progress reporting functionality:

Code Block
public PartitionAssignmentStatusResult partitionAssignmentStatus(int requestId)   1       Unknown topic
publicmy_topic PartitionAssignmentStatusResult partitionAssignmentStatus(int requestId,42  PartitionAssignmentStatusOptions options)

Where:

Code Block
public class PartitionAssignmentStatusResult {
   1 // private constructor
    public Map<TopicPartition, KafkaFuture<Void>> values()
Unknown partition
my_topic   0    public KafkaFuture<Void> all()
}

public class PartitionAssignmentStatusResult {
 42   // private constructor Unknown broker
my_topic   1 public Map<PartitionReplica, Progress> all()      
0    /** The progress ofBroker movingdoes thenot given partition */
    public KafkaFuture<Progress> progress(PartitionReplica)
    /**
     * Whether the overall reassignment is complete,
     * that is, if all the values in the map returned by {@link #all()}
     * have complete() == true
     */
    public KafkaFuture<Boolean> complete()
}
 
/** Represents the reassignment of a partition to a broker. */
public class PartitionReplica {
    public String getTopic()
    public int getPartition()
    public int getBroker()
    // plus override equals() and hashCode()
}
 
public class Progress {
    /** The number of completed units of work. */
    long getNumerator()
    /** The total number of units of work. */
    Long getDenominator()
    /** The error code, if thehost this partition

Internally, the ReassignPartitionsCommand will be refactored to support the above changes to the options. An interface will abstract the commands currently issued directly to zookeeper.

There will be an implementation which makes the current calls to ZooKeeper, and another implementation which uses the AdminClient API described below.

In all other respects, the public API of ReassignPartitionsCommand will not be changed.

AdminClient

The following methods will be added to AdminClient to support the ability to reassign partitions:

Code Block
/**
 * Request alteration of the given topics. The request can change the number of 
 * partitions, replication factor and/or the partition assignments. 
 * This can be a long running operation as replicas are migrated between brokers, 
 * therefore the returned result conveys whether the alteration has been 
 * started, not that it is complete. Progress information 
 * can be obtained by calling the lead broker's 
 * {@link #replicaStatus(Collection)}.
 */
public AlterTopicsResult alterTopics(Collection<AlteredTopic> alteredTopics)
public AlterTopicsResult alterTopics(Collection<AlteredTopic> alteredTopics,  AssignPartitionOptions options)

Where:

Code Block
public class AlteredTopic {
    public AlteredTopic(String name, int numPartitions, int replicationFactor, Map<Integer,List<Integer>> replicasAssignment) {
        // ...
    }
    /** The name of the topic to alter task ended erroneously. */
    Errorspublic String getErrorname()
}

The Progress class presented here can easily be reused in future APIs also dealing with progress of long running jobs. The result of getDenominator() is nullable because it cannot be expected that all tasks can efficiently compute their total size. The isComplete() method is provided to determine completeness since getNumerator() == getDenominator() cannot be used in the event of a null denominator.

In general, the denominator will not be constant between successive requests since new messages will arrive from producers. Indeed it is possible for a partition to receive messages from producers at a faster rate than the syncing replica is catching up, i.e. the percentage completion implied by numerator ÷ denominator can decrease.

For partition reassignment, Progress has sufficient information for a percent completion calculation, but not any kind of time-to-completion, since the AdminClient doesn't know the time that the initial request was made.

Authorization

With broker-mediated reassignment it becomes possible limit the authority to perform reassignment to something finer-grained than "anyone with access to zookeeper".

The reasons for reassignment are usually operational. For example,  migrating partitions to new brokers when expanding the cluster, or attempting to find a more balanced assignment (according to some notion of balance). These are cluster-wide considerations and so authority should be for the reassign operation being performed on the cluster.

Given the standard form for authorization in Kafka

"Principal P is [Allowed/Denied] Operation O From Host H On Resource R"

the authorized operation will be ClusterAction, on the CLUSTER resource. So to authorize Alice and Bob to perform rebalancing one might need to configure an ACL like this:

  bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
    --add --allow-principal User:Bob --allow-principal User:Alice \
    --allow-host 198.51.100.0 --allow-host 198.51.100.1 \
    --operation ClusterAction --cluster

Network Protocol: AssignPartitionsRequest and AssignPartitionsResponse

An AssignPartitionsRequest will initiate the process of partition reassignment

Code Block
AssignPartitionsRequest => [PartitionAssignment]
  PartitionAssignment => Topic Partition Brokers
    Topic => string
    Partition =>int32
    Brokers => [int32]

Where:

  • Topic a topic name
  • Partition a partition of that topic
  • Brokers the broker ids which will host the partition

Possible Error Codes:

  • ClusterAuthorizationFailedCode (31)
  • PartitionReassignmentInProgress (new)

As currently, it will not be possible to have multiple reassignments running concurrently, hence the addition of PartitionReassignmentInProgress 

It is not necessary to send an AssignPartitionsRequest to the leader for a given partition. Any broker will do.

The AssignPartitionsResponse enumerates those topics and partitions in the request, together with any error in initiating reassignment that partition:

Code Block
AssignPartitionsResponse => RequestId [PartitionAssignmentResult]
  RequestId => int32
  PartitionAssignmentResult => Topic Partition Broker Error
    Topic => string
    Partition => int32
    Broker => int32
    Error => int16

Where:

  • RequestId is an id which can be used to make in a PartitionAssignmentStatusRequest
  • Topic is a topic name
  • Partition is a partition id
  • Broker is the broker
  • Error is an error code for why the initiation of assignment of the given topic to the given broker failed

The anticipated errors in the header are:

  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_PARTITIONS (37) If the partition doesn't exist
  • UNKNOWN_MEMBER_ID (25) If the Brokers in the AssignPartitionsRequest included an unknown broker id

Network Protocol: PartitionAssignmentStatusRequest and PartitionAssignmentStatusResponse

To request progress information about the given request

Code Block
PartitionAssignmentStatusRequest => RequestId
  RequestId => int32

Where:

  • RequestId is the identifier from a previous AssignPartitionsResponse
;
    /** The new number of partitions, or -1 if the number of partitions should not be changed. */
    public int numPartitions();
    /** The new replication factor, or -1 if the replication factor should not be changed. */
    public int replicationFactor();
    /** 
     * The new assignments of partition to brokers, or the empty map 
     * if the broker should assign replicas automatically. 
     */
    Map<Integer,List<Integer>> replicasAssignment();
}
public class AlterTopicsResult {
    // package-access constructor
    /** A mapping of the name of a requested topic to the error for that topic */
    KafkaFuture<Map<String, Errors>> result(); 
    /** The error for a give topic */
	KafkaFuture<Errors> topicResult(String topicName);
}

The following methods will be added to AdminClient to support the progress reporting functionality:

Code Block
/**
 * Query the replication status of the given partitions of which this
 * broker is the leader.
 */
public ReplicaStatusResult replicaStatus(Collection<TopicPartition> replicas)   
public ReplicaStatusResult replicaStatus(Collection<TopicPartition> replicas,  ReplicaStatusOptions options)

Where:

Code Block
public class ReplicaStatusResult {
    public KafkaFuture<Map<TopicPartition, List<ReplicaStatus>>> all()
}
/** 
 * Represents the replication status of a partition 
 * on a particular broker.
 */ 
public class ReplicaStatus {
    /** The topic about which this is the status of */
    String topic()
    /** The partition about which this is the status of */
    int gpartition()
    /** The broker about which this is the status of */
    int broker()
    
    /** 
     * The time (as milliseconds since the epoch) that 
     * this status data was collected. In general this may
     * be some time before the replicaStatus() request time.
     */
    public long statusTime()
    
    /** 
     * The number of messages that the replica on this broker is behind
     * the leader.
     */
    public long lag()
    
}

Authorization

With broker-mediated reassignment it becomes possible limit the authority to perform reassignment to something finer-grained than "anyone with access to zookeeper".

The reasons for reassignment are usually operational. For example, migrating partitions to new brokers when expanding the cluster, or attempting to find a more balanced assignment (according to some notion of balance). These are cluster-wide considerations and so authority should be for the reassign operation being performed on the cluster. Therefore alterTopics() will require ClusterAction on the CLUSTER.

replicaStatus() will require Describe on the CLUSTER (or the TOPIC?).

Network Protocol: AlterTopicsRequest and AlterTopicsResponse

An AlterTopicsRequest will initiate the process of topic alteration/partition reassignment

Code Block
AlterTopicsRequest => [AlteredTopic]
  AlteredTopic => Topic, Partitions, Replicas, PartitionAssignment
Code Block
PartitionAssignmentStatusResponse => [RequestId] [PartitionReplica Progress]
  PartitionReplica => Topic Partition Broker
    Topic => string
    Partition => int32 ; the topic name
    BrokerPartitions => int32  ; the number of partitions
    ProgressReplicas => Error Numerator Denominator int32    ; the replication factor
    ErrorPartitionAssignment => int16Partition Brokers
    Numerator  Partition => int64
>int32  ; the partition
     Denominator Brokers => int64

Where:

  • RequestId is the identifier from a previous AssignPartitionsResponse
  • Topic the a topic name
  • Partition a partition of the topic
  • Broker the id of the receiving broker
  • Error is an error code about the transfer of the partition to the given broker:
    • PartitionReassignmentInProgress (new) if the reassignment is still in progress,
    • None (0) if the reassignment completed normally,
    • Any other error if the reassignment completed exceptionally.
  • Numerator the number of items that have been processed so far
  • Denominator the total number of items we expect to process for the request. Or -1 if the total is not known. Never 0.

The anticipated errors in the header are:

  • INVALID_REQUEST (42) If the request id is not known

 As with the AdminClient API, the Progess Schema could be reused in other progress reporting APIs.

Implementation

  1. The mediating broker will write the reassignment JSON to the /admin/reassign_partitions znode, as present. The znode version for /admin/reassign_partitions becomes the request_id. This means that the request id is monotonic increasing, but that request id may not be contiguous. The monotonic increasing property is only required so that when the request id is omitted it is easy to figure out the last request.
  2. Receiving brokers are notified of the change to /admin/reassign_partitions (thus obtaining the request id) and start fetching replicas as necessary, as present
  3. As the leader serves fetch requests to followers, at the same time as deciding whether to update the ISR, it will publish their progress as JSON to /admin/reassign_partitions_status/$request_id
  4. To obtain progress the mediating broker will read the /admin/reassign_partitions_status/$request_id znodes.
  5. When the leader adds a syncing broker to the ISR the progress JSON for that partition in /admin/reassign_partitions_status/$request_id will be marked as complete.
[int32] ; the ids of the assigned brokers for this partition

Partitions or Replicas of -1 that would mean "no change". An empty PartitionAssignment would mean that the broker should calculate a suitable assignment. Such broker calculated assignment is unlikely to be balanced.

It is not necessary to send an AlterTopicsRequest to the leader for a given partition. Any broker will do.

The AlterTopicsResponse enumerates those topics in the request, together with any error in initiating alteration:

Code Block
AlterTopicsResponse => [AlteredTopic]
  AlteredTopic => Topic Error Detail
    Topic => string      ; the topic name
    Error => int32       ; the error code for altering this topic
    Detail => string     ; detailed error information

Possible Error Codes:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_PARTITIONS (37) If the Partitions was invalid
  • INVALID_REPLICATION_FACTOR (38) If the Replicas was invalid
  • UNKNOWN_MEMBER_ID (25) If any broker ids in the PartitionAssignment included an unknown broker id
  • PARTITION_REASSIGNMENT_IN_PROGRESS (new)
  • INVALID_REPLICA_ASSIGNMENT (39) If a partition, replica or broker id in the PartitionAssignment doesn't exist or is incompatible with the requested Partitions and /or Replicas.
  • NONE (0) If the request was successful and the alteration/reassignment has been started.

As currently, it will not be possible to have multiple reassignments running concurrently, hence the addition of the PARTITION_REASSIGNMENT_IN_PROGRESS error code.

Network Protocol: ReplicaStatusRequest and ReplicaStatusResponse

ReplicaStatusRequest requests information about the progress of a number of replicas.

Code Block
ReplicaStatusRequest => [Replica]
  Replica => Topic Partition Broker
    Topic => string     ; a topic name
    Partition => int32  ; a partition id of this topic
    Broker => int32     ; a follower broker id for this partition

The response includes replication information for each of the replicas in the request:

Code Block
ReplicaStatusResponse => [ReplicaStatus]
  ReplicaStatus => Topic Partition Broker Error StatusTime, IsrTime, FollowerTime Lag MaxLag
    Topic => string         ; the topic name
    Partition => int32      ; the partition id
    Broker => int32         ; the follower broker id
    Error => int16          ; an error code
    StatusTime -> int64     ; the time the status was current
    Lag => int64            ; the lag (#messages)

Anticipated errors are:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed. (or the TOPIC?)
  • INVALID_TOPIC_EXCEPTION (17) The topic is not known
  • INVALID_PARTITIONS (37)  The Partion of the given topic is not valid
  • UNKNOWN_MEMBER_ID (25) The given Broker id is not known.
  • UNKNOWN_TOPIC_OR_PARTITION (3) The given Broker is not a follower for the partition identified by Topic, Partition.
  • NONE (0) if the status request completed normally,

Implementation

The AdminClient.replicaStatus() has to be made (an the underlying ReplicaStatusRequest sent) to the leader for the given partition. This saves the need for every broker (because any broker could be the --bootstrap-server ) to have knowledge of the replication status of every replica, which would be inefficient in network IO and/or memory use. This means that, in general, the ReassignPartitionsCommand would need to discover other brokers in the cluster (via AdminClient.describeCluster()) and make a separate request to each broker implied by the reassignment in order to report progressIt will be necessary to delete the /admin/reassign_partitions_status/$request_id znodes to prevent the /admin/reassign_partitions_status subtree growing arbitrarily. At the same time the leader publishes status to the current /admin/reassign_partitions_status/$request_id it will remove znodes which had completed a given amount of time ago. This will be configurable via the broker property reassign.partitions.status.min.retention.ms which will default to 1 day.

Compatibility, Deprecation, and Migration Plan

...

Another alternative is to do exactly this KIP, but without the deprecation of --zookeeper. That would have a higher long term maintenance burden, and would prevent any future plans to, for example, provide alternative cluster technologies than ZooKeeper.