Status
Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Motivation
When Kafka is used as messaging infrastructure of consumer business, we often need to adjust the capacity of certain topics to accommodate the bursty nature of the data. The adjustment involves both adding more partitions hence the throughput, and desirably reducing partitions so that they can be reclaimed. Operating clusters with ever growing partitions adds overhead to the operation. First, the performance degrades when a single disk needs to support large number partitions; and second larger cluster footprints makes it more vulnerable to disruption at infrastructure level such as machine or rack decommission (which is not uncommon in large enterprise). This motivates us to add an easy and transparent way to reduce partitions for topics, which is particularly convenient in the following situation.
The cluster has a large number of topics and total number of partitions is close to the limit.
Most of the dynamic topics do not contain keyed messages. (See limitation section)
The data retention period is relatively short.
We have implemented the described changes and deployed it in various setups.
Public Interfaces
Protocol API changes
A new API type DeletePartitions will be added: DeletePartitionsRequest and DeletePartitionsResponse
DeletePartitionsRequest Request (Version: 0) => timeout_ms immediate_delete [topics] TAG_BUFFER timeout_ms => INT32 immediate_delete => BOOLEAN topics => name [partitions] TAG_BUFFER name => COMPACT_STRING partitions => INT32 TAG_BUFFER
DeletePartitionsResponse Response (Version: 0) => throttle_time_ms error_code error_message [responses] TAG_BUFFER throttle_time_ms => INT32 error_code => INT16 error_message => COMPACT_NULLABLE_STRING responses => name [partitions] TAG_BUFFER name => COMPACT_STRING partitions => partition_index status_code error_code error_message TAG_BUFFER partition_index => INT32 status_code => INT8 error_code => INT16 error_message => COMPACT_NULLABLE_STRING
MetaData API changes
- Add status_code field in Metadata API that represents the partitions of current topic
Metadata Request (Version: 10) => [topics] allow_auto_topic_creation include_cluster_authorized_operations include_topic_authorized_operations TAG_BUFFER topics => name TAG_BUFFER name => COMPACT_STRING allow_auto_topic_creation => BOOLEAN include_cluster_authorized_operations => BOOLEAN include_topic_authorized_operations => BOOLEAN Metadata Response (Version: 10) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations TAG_BUFFER throttle_time_ms => INT32 brokers => node_id host port rack TAG_BUFFER node_id => INT32 host => COMPACT_STRING port => INT32 rack => COMPACT_NULLABLE_STRING cluster_id => COMPACT_NULLABLE_STRING controller_id => INT32 topics => error_code name is_internal [partitions] topic_authorized_operations TAG_BUFFER error_code => INT16 name => COMPACT_STRING is_internal => BOOLEAN partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas] TAG_BUFFER error_code => INT16 partition_index => INT32 leader_id => INT32 leader_epoch => INT32 replica_nodes => INT32 isr_nodes => INT32 offline_replicas => INT32 status_code => INT8 topic_authorized_operations => INT32 cluster_authorized_operations => INT32
Client API changes
Add a new partition status field in org.apache.kafka.common.PartitionInfo
In org.apache.kafka.common.Cluster, exclude offline partitions in the availablePartitionsForTopic
public class PartitionInfo { private final String topic; private final int partition; private final Node leader; private final Node[] replicas; private final Node[] inSyncReplicas; private final Node[] offlineReplicas; private int status; }
Admin Client API changes
- The AdminClient API will have new methods
deletePartitions(Map<String, DeletePartitions> partitions) @InterfaceStability.Evolving public class DeletePartitions { private int totalCount; private boolean immediateDelete; }
Kafka Topic Command changes
- Support using --partitions options to specify a smaller number than current partitions
- Add --deleteNow option to specify that data should be deleted right away. The default behavior is to delete after retention period passes.
./kafka-topics.sh --alter --topic ... --partitions ... --immediateDeletePartition ...
- The command will issue warning when a topic contains keyed message
"WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected"
Proposed Changes
We propose to add new status for a partition that indicates if a partition is to be removed. Specifically at any time a partition can be
online (code=0): it means the partition is normal
offline (code=1): it means the partition should be excluded by SDK client for both message production and consumption. When the data is purged this partition will be removed from the topics metadata.
readOnly (code=-1): it means the partition should be filtered and not written to, but consumption is not impacted. After the retention period has passed the status will transit to offline.
The status is stored in zookeeper:
{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0], "status_code": 0}
The following change in Controller will be made:
In zookeeper a path /admin/delete_topic_partitions will be added. The format of the node is ()
In the controller monitoring path, register DeleteTopicPartitionHandler to monitor the zookeeper event; add a controller event TopicPartitionDeletion
Add a class TopicPartitionDeletionManager to handle TopicPartitionDeletion event
The workflow involving TopicPartitionDeletionManager class is summarized as below:
TopicCommand class executes the delete command and creates a node in zookeeper directory /admin/delete_topic_partitions with the node topic name and the list of partitions
TopicPartitionDeletion class monitors change in the above directory, initiates the deletion process and adds it to the queue for status polling
TopicPartitionDeletionManager.onPartitionDeletion update the status of the partition to readOnly or offline according to the deleteNow flag; then notify all brokers through PartitionStateMachine
TopicPartitionDeletionManager checks the status for each partition and call onPartitionDeletion for all the offline partitions. Notify all the brokers through PartitionStateMachine
ReplicaStateMachine executes OfflineReplica and ReplicaDeletionStarted to stop synchronize data at all brokers
When Controller gets the successful stopReplica response from Broker, ReplicaStateMachine executes ReplicaDeletionSuccessful to update partition information. Then it cleans up zookeeper path as well. On the other hand if stopReplica fails, it will call ReplicaDeletionIneligible and wait for the Controller to retry
Compatibility, Deprecation, and Migration Plan
When will we remove the existing behavior?When a topic contains keyed messages, partition removal cannot be completely transparent to the client, as during the process (i.e. when the old partition is in readOnly state) there will be two partitions that contain messages of the same key. We The application needs to handle this temporary discrepancy until the old partition is offline. In the change we propose to issue a warning message in the kafka-topics command.