Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We propose to add new field StatusMode 'mode' for a partition that indicates if a partition is to be removed. specifically at any time a partition can be

...

Code Block
linenumberstrue
{
  "apiKey": 3,
  "type": "metadata",
  "name": "PartitionRecord",
  "validVersions": "0",
  "fields": [
    { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The partition id." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique ID of this topic." },
    { "name": "Replicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas of this partition, sorted by preferred order." },
    { "name": "Isr", "type":  "[]int32", "versions":  "0+",
      "about": "The in-sync replicas of this partition" },
    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+", "nullableVersions": "0+",
      "about": "The replicas that we are in the process of removing." },
    { "name": "AddingReplicas", "type":  "[]int32", "versions":  "0+", "nullableVersions": "0+",
      "about": "The replicas that we are in the process of adding." },
    { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The lead replica, or -1 if there is no leader." },
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
      "about": "An epoch that gets incremented each time we change the ISR." },
      { "name": "StatusModeMode", "type": "int32", "versions": "0+", "default": "0",
      "about": "The read statuswrite codemode of the current partition." }
  ]
}

...

Code Block
linenumberstrue
{
  "apiKey": xxx,
  "type": "metadata",
  "name": "DeleteTopicPartitionsRecord",
  "validVersions": "0",
  "fields": [
    { "name": "TopicId", "type": "uuid", "versions": "0+",
    "about": "The topic partition to remove. " },
    { "name": "DeletePartitions", "type": "int32", "versions": "0+","about": "The partition to remove. All associated partitions will be removed as well." },
     { "name": "DeletePartitionsDelayTimestamp", "type": "bool", "versions": "0+","about": "Delay timestamp deletion of data." },
     { "name": "CreateTimestamp", "type": "int64", "versions": "0+","about": "The record create timestamp." },
  ]
}

...

Code Block
linenumberstrue
UpdateMetadata Response (Version: 7) => error_code TAG_BUFFER 
  error_code => INT16

MetaData API

Add status_'mode' field in Metadata API that represents the read/write model of the partitions of current topic, meanwhile incrementing the version of ApiKey.

Code Block
languagejava
linenumberstrue
Metadata Request (Version: 10) => [topics] allow_auto_topic_creation include_cluster_authorized_operations include_topic_authorized_operations TAG_BUFFER 
  topics => name TAG_BUFFER 
    name => COMPACT_STRING
  allow_auto_topic_creation => BOOLEAN
  include_cluster_authorized_operations => BOOLEAN
  include_topic_authorized_operations => BOOLEAN

Metadata Response (Version: 10) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations TAG_BUFFER 
  throttle_time_ms => INT32
  brokers => node_id host port rack TAG_BUFFER 
    node_id => INT32
    host => COMPACT_STRING
    port => INT32
    rack => COMPACT_NULLABLE_STRING
  cluster_id => COMPACT_NULLABLE_STRING
  controller_id => INT32
  topics => error_code name is_internal [partitions] topic_authorized_operations TAG_BUFFER 
    error_code => INT16
    name => COMPACT_STRING
    is_internal => BOOLEAN
    partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas] TAG_BUFFER 
      error_code => INT16
      partition_index => INT32
      leader_id => INT32
      leader_epoch => INT32
      replica_nodes => INT32
      isr_nodes => INT32
      offline_replicas => INT32
      status_mode => INT8
    topic_authorized_operations => INT32
  cluster_authorized_operations => INT32

Client API changes

  • Add a new partition status field 'mode' in org.apache.kafka.common.PartitionInfo

  • Both KafkaProducer and KafkaConsumer are aware of state of the partition and filter accordingly.
    • Partitions with ReadOnly and None status will be filtered out for writes.
    • Partitions with None status will be filtered out for reads.
Code Block
languagejava
linenumberstrue
public class PartitionInfo {
    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas;
    private final Node[] inSyncReplicas;
    private final Node[] offlineReplicas;
    private int statusmode;
}

Kafka Topic Command changes

...

  • Added controller event TopicPartitionDeletion
  • Add a class TopicPartitionDeletionManager to handle TopicPartitionDeletion event
  • When KafkaController starts, a scheduleDelayDeletePartitionTask is scheduled periodically to check retention for delayed deletion.

The workflow involving TopicPartitionDeletionManager workflow involving TopicPartitionDeletionManager class is summarized as below:

  • TopicCommand executes the DeletePartition RPC command to KafkaController and saves DeleteTopicPartitionsRecord in the KafkaController metadata.
  • TopicPartitionDeleteManager starts to execute onPartitionDeletion method, updates the mode of Partition to ReadOnly. The partition remains in OnlinePartition state. All brokers are notified through PartitionStateMachine.
  • ScheduleDelayDeletePartitionTask will update the Partition mode to None after specified delay period. The partition state changes to "OfflinePartition” OfflinePartition and "NonExistentPartition". The brokers are notified through PartitionStateMachine. and the partition replica status changes to "OfflineReplica" and "ReplicaDeletionStarted", stops synchronizing data and clear data at all broker through ReplicaStateMachine.
  • When Controller gets the successful stopReplica response from Broker, the Partition replica status is changed to ReplicaDeletionSuccessful, Then it cleans up metadata as well. otherwise, the Partition replica status changes to ReplicaDeletionIneligible, and waits for KafkaController to try again.

Compatibility, Deprecation, and Migration Plan

...

Clients of low version access high version of Broker: The Metadata protocol request version of Topic needs to be above the specified version, and the low version will return LEADER_NOT_AVAILABLE.

The high version client accesses the low version Broker: In the high version client org.apache.kafka.common.PartitionInfo class, the default value of the partition status field of the `status` field is ReadWrite.

Upgrading from any older version is possible, support for this feature that you need to set delete.topic.partition.enable=true, and the Metadata protocol request version of Topic is above the specified version.

Rejected Alternatives