Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note this was initially erroneously assigned as KIP-178, which was already taken, and has been reassigned KIP-179.

Table of Contents

Status

Current state: Under Discussion [One of "Under Discussion", "Accepted", "Rejected"]

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Describe the problems you are trying to solve.

...

Secondly, ReassignPartitionsCommand currently has no proper facility to report progress of a reassignment; --verify can be used periodically to check whether the request assignments have been achieved. It would be useful if the tool could report progress better.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

...

New network protocol APIs will be added:

The AdminClient API will have new methods added (plus overloads for options):

The options accepted by kafka-reassign-partitions.sh command will change:

  • --zookeeper will be deprecated, with a warning message
  • a new --bootstrap-server option will be added
  • a new --progress action option will be added

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

kafka-reassign-partitions.sh and ReassignPartitionsCommand

The --zookeeper option will be retained and will:

...

In all other respects, the public API of ReassignPartitionsCommand will not be changed.

AdminClient: alterPartitionCount()

Anchor
alterPartitionCount
alterPartitionCount
To support kafka-topics.sh --alter --partitions ... the following methods will be added to AdminClient to support changing topics' partition counts

...

Code Block
class AlterPartitionCountsOptions {
    // TODO validateOnly?
    // TODO timeout?
}
 
class AlterPartitionCountsResult {
    Map<String, KafkaFuture<Void>> values();
}


AdminClient: alterReplicationCount()

Anchor
alterReplicationCount
alterReplicationCount
To support kafka-topics.sh --alter --replication-factor ... the following methods will be added to AdminClient to support changing topics' replication factors.

...

Code Block
class AlterReplicationFactorsOptions {
    // TODO validateOnly?
    // TODO timeout?
}
class AlterReplicationFactorsResult {
    Map<String, KafkaFuture<Void>> values();
}

 

AdminClient: reassignPartitions()

Anchor
reassignPartitions
reassignPartitions
To support kafka-topics.sh --alter --replica-assignment ... and kafka-reassign-partitions.sh the following methods will be added to AdminClient to support changing the brokers hosting the partitions of a topic

...

Partition reassignment is a long running operation, and the ReassignPartitionsResult indicates only that the reassignment has been started, not that the reassignment has been completed. The new replicaStatus() method can be used to check progress and completion of the reassignment.

TODO quotas

AdminClient: replicaStatus()

Anchor
replicaStatus
replicaStatus
The following methods will be added to AdminClient to support the progress reporting functionality:

...

Code Block
public class ReplicaStatusOptions {
    
}

public class ReplicaStatusResult {
    public KafkaFuture<Map<TopicPartition, List<ReplicaStatus>>> all()
}

/** 
 * Represents the replication status of a partition 
 * on a particular broker.
 */ 
public class ReplicaStatus {
    /** The topic about which this is the status of */
    String topic()
    /** The partition about which this is the status of */
    int partition()
    /** The broker about which this is the status of */
    int broker()
    
    /** 
     * The time (as milliseconds since the epoch) that 
     * this status data was collected. In general this may
     * be some time before the replicaStatus() request time.
     */
    public long statusTime()
    
    /** 
     * The number of messages that the replica on this broker is behind
     * the leader.
     */
    public long lag()
    
}

 

Network Protocol: AlterPartitionCountsRequest and AlterPartitionCountsResponse

Anchor
AlterPartitionCountsRequest
AlterPartitionCountsRequest
The AlterPartitionCountsRequest is used to change the partition count for a batch of topics, and is the basis for the  AdminClient.alterPartitionCounts() method.

No Format
AlterPartitionCountsRequest => [topic_partition_count]
  topic_partition_count => topic partition_count
    topic => STRING
    partition_count => INT32
  // TODO: validate_only and/or timeout flags?

...

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_PARTITIONS (37) If the num_partitions was invalid
  • INVALID_REQUEST (42) If duplicate topics appeared in the request.
  • NONE (0) The topic partition count was changed successfully.

Network Protocol: AlterReplicationFactorsRequest and AlterReplicationFactorsResponse

Anchor
AlterReplicationFactorsRequest
AlterReplicationFactorsRequest
The AlterReplicationFactorRequest is used to change the replication factor for a batch of topics, and is the basis for the AdminClient.alterReplicationFactors() method.

No Format
AlterReplicationFactorsRequest => [topic_replication_factor]
  topic_replication_factor => topic replication_factor
    topic => STRING
    replication_factor => INT16
  // TODO: validate_only and/or timeout flags?

...

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_REPLICATION_FACTOR (38) If the replication_factor was invalid
  • INVALID_REQUEST (42) If duplicate topics appeared in the request.
  • NONE (0) The topic replication factor was changed successfully.

Network Protocol: ReassignPartitionsRequest and ReassignPartitionsResponse

Anchor
ReassignPartitionsRequest
ReassignPartitionsRequest
ReassignPartitionsRequest initiates the movement of replicas between brokers, and is the basis of the AdminClient.reassignPartitions() method

No Format
ReassignPartitionsRequest => [reassigned_topic]
  reassigned_topic => topic [reassigned_partition]
    topic => STRING
    reassigned_partition => partitiod_id [broker]
      partition_id => INT32
      broker => INT32
  validate_only => BOOLEAN

...

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • UNKNOWN_MEMBER_ID (25) If any broker ids in the partition_assignment included an unknown broker id
  • INVALID_REQUEST (42) If duplicate topics appeared in the request
  • PARTITION_REASSIGNMENT_IN_PROGRESS (new)
  • INVALID_REPLICA_ASSIGNMENT (39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.
  • NONE (0) reassignment has started

Network Protocol: ReplicaStatusRequest and ReplicaStatusResponse

Anchor
ReplicaStatusRequest
ReplicaStatusRequest
ReplicaStatusRequest requests information about the progress of a number of replicas. It is not tied to a prior AlterReplicationFactorsRequest or ReassignPartitionsRequest, but is intended as a way of monitoring the progress and completion of those operations.

...

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed. (or the TOPIC?)
  • INVALID_TOPIC_EXCEPTION (17) The topic is not known
  • INVALID_PARTITIONS (37)  The partion_id of the given topic is not valid
  • UNKNOWN_MEMBER_ID (25) The given broker id is not known.
  • UNKNOWN_TOPIC_OR_PARTITION (3) The given broker is not a follower for the partition identified by topic, partition.
  • NONE (0) if the status request completed normally,

Implementation

The AdminClient.replicaStatus() will make the underlying ReplicaStatusRequest to the leader for the given partition. This saves the need for every broker (because any broker could be the --bootstrap-server ) to have knowledge of the replication status of every replica, which would be inefficient in network IO and/or memory use.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Existing users of the kafka-reassign-partitions.sh will receive a deprecation warning when they use the --zookeeper option. The option will be removed in a future version of Kafka. If this KIP is introduced in version 1.0.0 the removal could happen in 2.0.0.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

...