Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note this was initially erroneously assigned as KIP-178, which was already taken, and has been reassigned KIP-179.

Table of Contents

Status

Current state: Under Discussion [One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here (when initially misnumbered as KIP-178) and here (when assigned KIP-179)

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Describe the problems you are trying to solve.

Firstly, the ReassignPartitionsCommand (which is used by the kafka-reassign-partitions.sh tool) talks directly to ZooKeeper. This prevents the tool being used in deployments where only the brokers are exposed to clients (i.e. where the zookeeper servers are intentionally not exposed). In addition, there is a general push to refactor/rewrite/replace tools which need ZooKeeper access with equivalents which use the AdminClient API. Thus it is necessary to change the ReassignPartitionsCommand so that it no longer talks to ZooKeeper directly, but via an intermediating broker. Similar work is needed for the kafka-topics.sh tool (which can also change assignments and numbers of partitions and replicas), so common AdminClient and protocol APIs are desirable.

Secondly, ReassignPartitionsCommand currently has no proper facility to report progress of a reassignment; --verify can be used periodically to check whether the request assignments have been achieved. It would be useful if the tool could report progress better.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Two new network protocol APIs will be added:

  • AlterTopicsRequest and AlterTopicsResponse
  • ReplicaStatusRequest and ReplicaStatusResponse

The AdminClient API will have two new methods added (plus overloads for options):

  • alterTopics(Set<AlterTopics>)
  • replicaStatus(Collection<Replica> replicas)

The options accepted by kafka-reassign-partitions.sh command will change:

  • --zookeeper will be deprecated, with a warning message
  • a new --bootstrap-server option will be added
  • a new --progress action option will be added

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

kafka-reassign-partitions.sh and ReassignPartitionsCommand

The --zookeeper option will be retained and will:

  1. Cause a deprecation warning to be printed to standard error. The message will say that the --zookeeper option will be removed in a future version and that --bootstrap-server is the replacement option.
  2. Perform the reassignment via ZooKeeper, as currently.

A new --bootstrap-server option will be added and will:

  1. Perform the reassignment via the given intermediating broker.

Using both --zookeeper and --bootstrap-server in the same command will produce an error message and the tool will exit without doing the intended operation.

It is anticipated that a future version of Kafka would remove support for the --zookeeper option.

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-5601
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-5561

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Describe the problems you are trying to solve.

Firstly, the ReassignPartitionsCommand (which is used by the kafka-reassign-partitions.sh tool) talks directly to ZooKeeper. This prevents the tool being used in deployments where only the brokers are exposed to clients (i.e. where the zookeeper servers are intentionally not exposed). In addition, there is a general push to refactor/rewrite/replace tools which need ZooKeeper access with equivalents which use the AdminClient API. Thus it is necessary to change the ReassignPartitionsCommand so that it no longer talks to ZooKeeper directly, but via an intermediating broker. Similar work is needed for the kafka-topics.sh tool (which can also change assignments and numbers of partitions and replicas), so common AdminClient and protocol APIs are desirable.

Secondly, ReassignPartitionsCommand currently has no proper facility to report progress of a reassignment; --verify can be used periodically to check whether the request assignments have been achieved. It would be useful if the tool could report progress better.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Two new network protocol APIs will be added:

The AdminClient API will have two new methods added (plus overloads for options):

The options accepted by kafka-reassign-partitions.sh command will change:

  • --zookeeper will be deprecated, with a warning message
  • a new --bootstrap-server option will be added
  • a new --progress action option will be added

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

kafka-reassign-partitions.sh and ReassignPartitionsCommand

The --zookeeper option will be retained and will:

  1. Cause a deprecation warning to be printed to standard error. The message will say that the --zookeeper option will be removed in a future version and that --bootstrap-server is the replacement option.
  2. Perform the reassignment via ZooKeeper, as currently.

A new --bootstrap-server option will be added and will:

  1. Perform the reassignment via the given intermediating broker.

Using both --zookeeper and --bootstrap-server in the same A new --progress action option will be added. This will only be supported when used with --bootstrap-server. If used with --zookeeper the command will produce an error message and the tool will exit without doing the intended operation.

It is anticipated that a future version of Kafka would remove support for the --zookeeper option.

A new --progress action option will be added. This will only be supported when used with --bootstrap-server. If used with --zookeeper the command will produce an error message and the tool will exit without doing the intended operation. --progress will report on the synchronisation of each of the partitions and brokers in the reassignment given via the --reassignment-json-file option

For example:

Code Blocknoformat
# If the following command is used to start a reassignment
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
  --reassignment-json-file expand-cluster-reassignment.json \
  --execute
       
# then the following command will print the progress of
# that reassignment, then exit immediately
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
  --reassignment-json-file expand-cluster-reassignment.json \
  --progress

...

In all other respects, the public API of ReassignPartitionsCommand will not be changed.

AdminClientalterTopics()

Anchor
alterTopics
alterTopics
The following methods will be added to AdminClient to support the ability to reassign partitions:

Code Block
/**
 * Request alteration of the given topics. The request can change the number of 
 * partitions, replication factor and/or the partition assignments. 
 * This can be a long running operation as replicas are migrated between brokers, 
 * therefore the returned result conveys whether the alteration has been 
 * started, not that it is complete. Progress information 
 * can be obtained by calling the lead broker's 
 * {@link #replicaStatus(Collection)}.
 */
public AlterTopicsResult alterTopics(Collection<AlteredTopic> alteredTopics)
public AlterTopicsResult alterTopics(Collection<AlteredTopic> alteredTopics,  AssignPartitionOptionsAlterTopicsOptions options)

Where:

Code Block
public class AlteredTopic {
    public AlteredTopic(String name, int numPartitions, int replicationFactor, Map<Integer,List<Integer>> replicasAssignment) {
        // ...
    }
    /** The name of the topic to alter. */
    public String name();
    /** The new number of partitions, or -1 if the number of partitions should not be changed. */
    public int numPartitions();
    /** The new replication factor, or -1 if the replication factor should not be changed. */
    public intshort replicationFactor();
    /** 
     * The new assignments of partition to brokers, or the empty map 
     * if the broker should assign replicas automatically. 
     */
    Map<Integer,List<Integer>> replicasAssignment();
}

public class AlterTopicsResultAlterTopicsOptions {
    // package-access constructorpublic AlterTopicsOptions validateOnly(boolean validateOnly);
    /**public A mapping of the name of a requested topic to the error for that topic */
    KafkaFuture<Map<String, Errors>> result(); 
    /** The error for a give topic */
	KafkaFuture<Errors> topicResult(String topicName);
}boolean validateOnly();
    public AlterTopicsOptions timeoutMs(long timeoutMs);
	public long timeoutMs();
 }

public class AlterTopicsResult {
    // package-access constructor
    /** A mapping of the name of a requested topic to the error for that topic. */
    Map<String, KafkaFuture<Void>> values(); 
    /** Return a future which succeeds if all the topic alterations were accepted. */
	KafkaFuture<Void> all();
}

AdminClient: replicaStatus()

Anchor
replicaStatus
replicaStatus
The following methods will be added to AdminClient to support the progress reporting functionality:

Code Block
/**
 * Query the replication status of the given partitions of which. this
 */
public broker is the leader.
 */
public ReplicaStatusResult ReplicaStatusResult replicaStatus(Collection<TopicPartition> replicas)   
public ReplicaStatusResult replicaStatus(Collection<TopicPartition> replicas,  ReplicaStatusOptions options)

Where:

Code Block
public class ReplicaStatusOptions {
    
}

public class ReplicaStatusResult {
    public KafkaFuture<Map<TopicPartition, List<ReplicaStatus>>> all()
}

/** 
 * Represents the replication status of a partition 
 * on a particular broker.
 */ 
public class ReplicaStatus {
    /** The topic about which this is the status of */
    String topic()
    /** The partition about which this is the status of */
    int gpartitionpartition()
    /** The broker about which this is the status of */
    int broker()
    
    /** 
     * The time (as milliseconds since the epoch) that 
     * this status data was collected. In general this may
     * be some time before the replicaStatus() request time.
     */
    public long statusTime()
    
    /** 
     * The number of messages that the replica on this broker is behind
     * the leader.
     */
    public long lag()
    
}

Authorization

With broker-mediated reassignment it becomes possible limit the authority to perform reassignment to something finer-grained than "anyone with access to zookeeper".

The reasons for reassignment are usually operational. For example, migrating partitions to new brokers when expanding the cluster, or attempting to find a more balanced assignment (according to some notion of balance). These are cluster-wide considerations and so authority should be for the reassign operation being performed on the cluster. Therefore alterTopics() will require ClusterAction on the CLUSTER.

replicaStatus() will require Describe on the CLUSTER (or the TOPIC?).

Network Protocol: AlterTopicsRequest

...

and AlterTopicsResponse

Anchor
AlterTopicsRequest
AlterTopicsRequest
An AlterTopicsRequest will initiate the process of topic alteration/partition reassignment

Code Blocknoformat
AlterTopicsRequest => [AlteredTopic]alter_topic_requests] validate_only
  AlteredTopicalter_topic_requests => Topic, Partitions, Replicas, PartitionAssignmenttopic num_partitions replication_factor [partition_assignment]
    Topictopic => stringSTRING
    num_partitions  ; the topic name=> INT32
    Partitionsreplication_factor => INT16
 int32  ; the number of partitionspartition_assignment => partition_id brokers
    Replicas => int32partition_id => INT32
  ; the replication factor
   brokers PartitionAssignment => Partition Brokers[INT32]
      Partitionvalidate_only =>int32  ; the partition> BOOLEAN
      Brokerstimeout => [int32] ; the ids of the assigned brokers for this partition

...

INT32

Where

FIELDDESCRIPTION
topic

the topic name

num_partition

the number of partitions. A num_partitions of -1 that would mean "no change"

replication_factor

the replication factor. A  replication_factor of -1 would mean "no change"

partition_id

the partition id

brokers

the ids of the assigned brokers for this partition

validate_only

true to just validate the request, but not actually alter the topics

timeoutthe timeout, in ms, to wait for the topic to be altered.

An empty partition_assignment would mean that the broker should calculate . An empty PartitionAssignment would mean that the broker should calculate a suitable assignment. Such broker calculated assignment is unlikely to be balanced.

It is not necessary to send an AlterTopicsRequest an AlterTopicsRequest to the leader for a given partition. Any broker will do.

Anchor
AlterTopicsResponse
AlterTopicsResponse
The AlterTopicsResponse enumerates those topics in the request, together with any error in initiating alteration:

Code Blocknoformat
AlterTopicsResponse => [AlteredTopicthrottle_time_ms [topic_errors]
  AlteredTopicthrottle_time_ms => Topic Error DetailINT32
    Topic  topic_errors => stringtopic error_code error_message
    ;topic the topic name=> STRING
    Errorerror_code => int32INT16
     error_message  ; the error code for altering this topic
    Detail => string     ; detailed error information=> NULLABLE_STRING

Where

FieldDescription
throttle_time_ms

duration in milliseconds for which the request was throttled

topic

the topic name

error_code

the error code for altering this topic

error_message

detailed error information

Possible values for error_codePossible Error Codes:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_PARTITIONS (37) If the Partitions num_partitions was invalid
  • INVALID_REPLICATION_FACTOR (38) If the Replicas was invalid
  • UNKNOWN_MEMBER_ID (25) If any broker ids in the PartitionAssignment included an unknown broker id
  • PARTITION_REASSIGNMENT_IN_PROGRESS (new)
  • INVALID_REPLICA_ASSIGNMENT (39) If a partition, replica or broker id in the PartitionAssignment doesn't exist or is incompatible with the requested Partitions and /or Replicas.
  • NONE (0) If the request was successful and the alteration/reassignment has been started.

As currently, it will not be possible to have multiple reassignments running concurrently, hence the addition of the PARTITION_REASSIGNMENT_IN_PROGRESS error code.

Network Protocol: ReplicaStatusRequest and ReplicaStatusResponse

  • the replication_factor was invalid
  • UNKNOWN_MEMBER_ID (25) If any broker ids in the partition_assignment included an unknown broker id
  • INVALID_REQUEST (42) If trying to modify the partition assignment and the number of partitions or the partition assignment and the replication factor in the same request. Or if duplicate topics appeared in the request.
  • PARTITION_REASSIGNMENT_IN_PROGRESS (new)
  • INVALID_REPLICA_ASSIGNMENT (39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.
  • NONE (0) If the request was successful and the alteration/reassignment has been started.

As currently, it will not be possible to have multiple reassignments running concurrently, hence the addition of the PARTITION_REASSIGNMENT_IN_PROGRESS error code.

Policy

The existing CreateTopicPolicy can be used to apply a cluster-wide policy on topic configuration at the point of creation via the create.topic.policy.class.name config property. To avoid an obvious loophole, it is necessary to also be able to apply a policy to topic alteration. Maintaining two separate policies in sync is a burden both in terms of class implementation and configuring the policy. It seems unlikely that many use cases would require a different policy for alteration than creation. On the other hand, just applying the CreateTopicPolicy to alterations is undesirable because:

  • Its name doesn't convey that it would be applied to alterations too
  • Its API (specifically its RequestMetadata member class) includes topic configs (i.e. Map<String, String>) which is not part of the API for topic alteration even though it is part of the API for topic creation.
  • It prevents any use cases which legitimately did need to apply a different policy for alteration than creation.

Finding a balance between compatibility with existing deployments, and not opening the loophole is difficult.

The existing create.topic.policy.class.name config would continue to work, and would continue to name an implementation of CreateTopicPolicy. That policy would be applied to alterations automatically. The topic's config would be presented to the validate() method (via the RequestMetadata) even though it's not actually part of the AlterTopicsRequest. The documentation for the interface and config property would be updated.

Network Protocol: ReplicaStatusRequest and ReplicaStatusResponse

Anchor
ReplicaStatusRequest
ReplicaStatusRequest
ReplicaStatusRequest requests information about the progress of a number of replicas.

Code Blocknoformat
ReplicaStatusRequest => [Replicareplica_status_requests]
  Replicareplica_status_requests => Topictopic Partition Broker
    Topic => string     ; a topic namepartition_id broker
    Partitiontopic => int32STRING
  ; a partition _id of this topic=> INT32
    Brokerbroker => int32     ; a follower broker id for this partitionINT32

Where

FieldDescription
topic

a topic name

partition_id

a partition id of this topic

broker

a follower broker id for this partition

 

Anchor
ReplicaStatusResponse
ReplicaStatusResponse
The response includes replication information for each of the replicas in the request:the request:

No Format
ReplicaStatusResponse => [replica_status]
  replica_status => topic partition_id broker error_code status_time lag
Code Block
ReplicaStatusResponse => [ReplicaStatus]
  ReplicaStatus => Topic Partition Broker Error StatusTime, IsrTime, FollowerTime Lag MaxLag
    Topic => string         ; the topic name
    Partition => int32      ; the partition id
    Brokertopic => int32STRING
         ; the follower broker idpartition_id => INT32
    Errorbroker => int16INT32
    error_code => INT16
    ; an error codestatus_time => INT64
    StatusTimelag -> int64     ; the time the status was current
    Lag => int64            ; the lag (#messages)=> INT64

Where

FieldDescription
topic

the topic name

partition_id

the partition id of this topic

broker

the follower broker id

error_code

an error code

status_time

the time the status was current

lag

the lag (#messages) of this broker, for this partition

Anticipated errors are:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed. (or the TOPIC?)
  • INVALID_TOPIC_EXCEPTION (17) The topic is not known
  • INVALID_PARTITIONS (37)  The Partion The partion_id of the given topic is not valid
  • UNKNOWN_MEMBER_ID (25) The given Broker given broker id is not known.
  • UNKNOWN_TOPIC_OR_PARTITION (3) The given Broker given broker is not a follower for the partition identified by Topic topic, Partition partition.
  • NONE (0) if the status request completed normally,

Implementation

The AdminClient.replicaStatus() has to be made (an will make the underlying ReplicaStatusRequest sent) to the leader for the given partition. This saves the need for every broker (because any broker could be the --bootstrap-server ) to have knowledge of the replication status of every replica, which would be inefficient in network IO and/or memory use. This means that, in general, the ReassignPartitionsCommand would need to discover other brokers in the cluster (via AdminClient.describeCluster()) and make a separate request to each broker implied by the reassignment in order to report progress.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Existing users of the kafka-reassign-partitions.sh will receive a deprecation warning when they use the --zookeeper option. The option will be removed in a future version of Kafka. If this KIP is introduced in version 1.0.0 the removal could happen in 2.0.0.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

...