Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We propose to add new field StatusMode for a partition that indicates if a partition is to be removed. specifically at any time a partition can be.

  • ReadWrite (code=0):it means the partition can be read from and written to.

  • ReadOnly (code=1):it means the partition can only be read

  • None (code=-1):it means the partition should be filtered and not written to, but consumption is not impacted.

...

Code Block
linenumberstrue
UpdateMetadata Response (Version: 7) => error_code TAG_BUFFER 
  error_code => INT16

...

MetaData API

Add status_code mode field in Metadata API that represents the partitions of current topic

Code Block
languagejava
linenumberstrue
Metadata Request (Version: 10) => [topics] allow_auto_topic_creation include_cluster_authorized_operations include_topic_authorized_operations TAG_BUFFER 
  topics => name TAG_BUFFER 
    name => COMPACT_STRING
  allow_auto_topic_creation => BOOLEAN
  include_cluster_authorized_operations => BOOLEAN
  include_topic_authorized_operations => BOOLEAN

Metadata Response (Version: 10) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations TAG_BUFFER 
  throttle_time_ms => INT32
  brokers => node_id host port rack TAG_BUFFER 
    node_id => INT32
    host => COMPACT_STRING
    port => INT32
    rack => COMPACT_NULLABLE_STRING
  cluster_id => COMPACT_NULLABLE_STRING
  controller_id => INT32
  topics => error_code name is_internal [partitions] topic_authorized_operations TAG_BUFFER 
    error_code => INT16
    name => COMPACT_STRING
    is_internal => BOOLEAN
    partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas] TAG_BUFFER 
      error_code => INT16
      partition_index => INT32
      leader_id => INT32
      leader_epoch => INT32
      replica_nodes => INT32
      isr_nodes => INT32
      offline_replicas => INT32
      status_codemode => INT8
    topic_authorized_operations => INT32
  cluster_authorized_operations => INT32

...

  • Added controller event TopicPartitionDeletion
  • Add a class TopicPartitionDeleteManager TopicPartitionDeletionManager to handle TopicPartitionDeletion event
  • When KafkaController starts, a scheduleDelayDeletePartitionTask is scheduled periodically to check retention for delayed deletion.

The workflow involving TopicPartitionDeletionManager class is summarized as below:

  • TopicCommand class executes the delete DeletePartition RPC command and creates a node in zookeeper directory /admin/delete_topic_partitions with the node topic name and the list of partitions 
  • TopicPartitionDeletion class monitors change in the above directory, initiates the deletion process and adds it to the queue for status polling

  • TopicPartitionDeletionManager.onPartitionDeletion update the status of the partition to readOnly or offline according to the deleteNow flag; then notify all brokers through PartitionStateMachine

  • TopicPartitionDeletionManager checks the status for each partition and call onPartitionDeletion for all the offline partitions. Notify all the brokers through PartitionStateMachine

  • ReplicaStateMachine executes OfflineReplica and ReplicaDeletionStarted to stop synchronize data at all brokers
  • to KafkaController and saves DeleteTopicPartitionsRecord in the KafkaController metadata.
  • TopicPartitionDeleteManager starts to execute onPartitionDeletion method, updates the mode of Partition to ReadOnly. The partition remains in OnlinePartition state. All brokers are notified through PartitionStateMachine.
  • ScheduleDelayDeletePartitionTask will update the Partition mode to None after specified delay period. The partition state changes to "OfflinePartition” and "NonExistentPartition". The brokers are notified through PartitionStateMachine. and the partition replica status changes to "OfflineReplica" and "ReplicaDeletionStarted", stops synchronizing data and clear data at all broker through ReplicaStateMachine.
  • When Controller gets the successful stopReplica response from Broker,
  • ReplicaStateMachine executes ReplicaDeletionSuccessful to update partition information.
  • the Partition replica status is changed to ReplicaDeletionSuccessful, Then it cleans up
  • zookeeper path
  • metadata as well.
  • On the other hand if stopReplica fails, it will call ReplicaDeletionIneligible and wait for the Controller to retry

...

  • otherwise, the Partition replica status changes to ReplicaDeletionIneligible, and waits for KafkaController to try again.

Compatibility, Deprecation, and Migration Plan

...