Status

Current stateUnder Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Motivation

When Kafka is used as messaging infrastructure of consumer business, we often need to adjust the capacity of certain topics to accommodate the bursty nature of the data. The adjustment involves both adding more partitions hence the throughput, and desirably reducing partitions so that they can be reclaimed. Operating clusters with ever growing partitions adds overhead to the operation. First, the performance degrades when a single disk needs to support large number partitions; and second larger cluster footprints makes it more vulnerable to disruption at infrastructure level such as machine or rack decommission (which is not uncommon in large enterprise). This motivates us to add an easy and transparent way to reduce partitions for topics, which is particularly convenient in the following situation.


We have implemented the described changes and deployed it in various setups.

Public Interfaces

Configuration Changes

Two configuration fields are added

Metadata Changes

We propose to modify PartitionRecord and add a DeleteTopicPartitionRecord in __cluster_metadata

PartitionRecord

We propose to add new field for a partition that indicates if a partition is to be removed. specifically at any time a partition can be.


{
"apiKey": 3,
  "type": "metadata",
  "name": "PartitionRecord",
  "validVersions": "0",
  "fields": [
    { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The partition id." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique ID of this topic." },
    { "name": "Replicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas of this partition, sorted by preferred order." },
    { "name": "Isr", "type":  "[]int32", "versions":  "0+",
      "about": "The in-sync replicas of this partition" },
    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+", "nullableVersions": "0+",
      "about": "The replicas that we are in the process of removing." },
    { "name": "AddingReplicas", "type":  "[]int32", "versions":  "0+", "nullableVersions": "0+",
      "about": "The replicas that we are in the process of adding." },
    { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The lead replica, or -1 if there is no leader." },
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
      "about": "An epoch that gets incremented each time we change the ISR." },
      { "name": "StatusMode", "type": "int32", "versions": "0+", "default": "0",
      "about": "The status code of current partition." }
  ]
}

DeleteTopicPartitionRecord


{
"apiKey": xxx,
  "type": "metadata",
  "name": "DeleteTopicPartitionsRecord",
  "validVersions": "0",
  "fields": [
    { "name": "TopicId", "type": "uuid", "versions": "0+",
    "about": "The topic partition to remove. " },
    { "name": "DeletePartitions", "type": "int32", "versions": "0+","about": "The partition to remove. All associated partitions will be removed as well." },
     { "name": "DeletePartitionsDelayTimestamp", "type": "bool", "versions": "0+","about": "Delay timestamp deletion of data." },
     { "name": "CreateTimestamp", "type": "int64", "versions": "0+","about": "The record create timestamp." },
  ]
}


AdminClient API changes

The AdminClient API will have new methods added 

deletePartitions(Map<String, DeletePartitions> partitions)

@InterfaceStability.Evolving
public class DeletePartitions {
    private int totalCount;
    private boolean immediateDelete;
}

Protocol RPC changes

A new API DeletePartitions will be added with the following DeletePartitionsRequest and DeletePartitionsResponse

DeletePartitionsRequest Request (Version: 0) => timeout_ms 
immediate_delete [topics] TAG_BUFFER 
  timeout_ms => INT32
  delete_partitions_delay => LONG
  topics => name [partitions] TAG_BUFFER 
    name => COMPACT_STRING
    partitions => INT32 TAG_BUFFER


DeletePartitionsResponse Response (Version: 0) => throttle_time_ms error_code error_message [responses] TAG_BUFFER 
	throttle_time_ms => INT32
	error_code => INT16
	error_message => COMPACT_NULLABLE_STRING
	responses => name [partitions] TAG_BUFFER 
		name => COMPACT_STRING
		partitions => partition_index status_code error_code error_message TAG_BUFFER 
			partition_index => INT32
			status_code => INT8
			error_code => INT16
			error_message => COMPACT_NULLABLE_STRING


In UpdateMeta API we add status_mode field to return the partition read/write status of the current topic partitions.


UpdateMetadata Request (Version: 7) => controller_id controller_epoch broker_epoch [topic_states] [live_brokers] TAG_BUFFER 
  controller_id => INT32
  controller_epoch => INT32
  broker_epoch => INT64
  topic_states => topic_name [partition_states] TAG_BUFFER 
    topic_name => COMPACT_STRING
    partition_states => partition_index controller_epoch leader leader_epoch [isr] zk_version [replicas] [offline_replicas] status_mode TAG_BUFFER 
      partition_index => INT32
      controller_epoch => INT32
      leader => INT32
      leader_epoch => INT32
      isr => INT32
      zk_version => INT32
      replicas => INT32
      offline_replicas => INT32
      status_mode => INT8
  live_brokers => id [endpoints] rack TAG_BUFFER 
    id => INT32
    endpoints => port host listener security_protocol TAG_BUFFER 
      port => INT32
      host => COMPACT_STRING
      listener => COMPACT_STRING
      security_protocol => INT16
    rack => COMPACT_NULLABLE_STRING


UpdateMetadata Response (Version: 7) => error_code TAG_BUFFER 
  error_code => INT16


MetaData API changes

Metadata Request (Version: 10) => [topics] allow_auto_topic_creation include_cluster_authorized_operations include_topic_authorized_operations TAG_BUFFER 
  topics => name TAG_BUFFER 
    name => COMPACT_STRING
  allow_auto_topic_creation => BOOLEAN
  include_cluster_authorized_operations => BOOLEAN
  include_topic_authorized_operations => BOOLEAN

Metadata Response (Version: 10) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations TAG_BUFFER 
  throttle_time_ms => INT32
  brokers => node_id host port rack TAG_BUFFER 
    node_id => INT32
    host => COMPACT_STRING
    port => INT32
    rack => COMPACT_NULLABLE_STRING
  cluster_id => COMPACT_NULLABLE_STRING
  controller_id => INT32
  topics => error_code name is_internal [partitions] topic_authorized_operations TAG_BUFFER 
    error_code => INT16
    name => COMPACT_STRING
    is_internal => BOOLEAN
    partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas] TAG_BUFFER 
      error_code => INT16
      partition_index => INT32
      leader_id => INT32
      leader_epoch => INT32
      replica_nodes => INT32
      isr_nodes => INT32
      offline_replicas => INT32
      status_code => INT8
    topic_authorized_operations => INT32
  cluster_authorized_operations => INT32


Client API changes


public class PartitionInfo {
    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas;
    private final Node[] inSyncReplicas;
    private final Node[] offlineReplicas;
    private int status;
}

Kafka Topic Command changes

./kafka-topics.sh --alter --topic ... --partitions ... --delete-partitions-delay ...

Proposed Changes

We propose to add new status for a partition that indicates if a partition is to be removed. Specifically at any time a partition can be 

The status is stored in zookeeper:

{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0], "status_code": 0}


The following change in Controller will be made:

The workflow involving TopicPartitionDeletionManager class is summarized as below:

Compatibility, Deprecation, and Migration Plan

When will we remove the existing behavior?When a topic contains keyed messages, partition removal cannot be completely transparent to the client, as during the process (i.e. when the old partition is in readOnly state) there will be two partitions that contain messages of the same key. We The application needs to handle this temporary discrepancy until the old partition is offline. In the change we propose to issue a warning message in the kafka-topics command.

Rejected Alternatives