...
We propose to add new field StatusMode 'mode' for a partition that indicates if a partition is to be removed. specifically at any time a partition can be
...
Code Block | ||
---|---|---|
| ||
{ "apiKey": 3, "type": "metadata", "name": "PartitionRecord", "validVersions": "0", "fields": [ { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1", "about": "The partition id." }, { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique ID of this topic." }, { "name": "Replicas", "type": "[]int32", "versions": "0+", "about": "The replicas of this partition, sorted by preferred order." }, { "name": "Isr", "type": "[]int32", "versions": "0+", "about": "The in-sync replicas of this partition" }, { "name": "RemovingReplicas", "type": "[]int32", "versions": "0+", "nullableVersions": "0+", "about": "The replicas that we are in the process of removing." }, { "name": "AddingReplicas", "type": "[]int32", "versions": "0+", "nullableVersions": "0+", "about": "The replicas that we are in the process of adding." }, { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1", "about": "The lead replica, or -1 if there is no leader." }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", "about": "An epoch that gets incremented each time we change the ISR." }, { "name": "StatusModeMode", "type": "int32", "versions": "0+", "default": "0", "about": "The read statuswrite codemode of the current partition." } ] } |
...
Code Block | ||
---|---|---|
| ||
{
"apiKey": xxx,
"type": "metadata",
"name": "DeleteTopicPartitionsRecord",
"validVersions": "0",
"fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic partition to remove. " },
{ "name": "DeletePartitions", "type": "int32", "versions": "0+","about": "The partition to remove. All associated partitions will be removed as well." },
{ "name": "DeletePartitionsDelayTimestamp", "type": "bool", "versions": "0+","about": "Delay timestamp deletion of data." },
{ "name": "CreateTimestamp", "type": "int64", "versions": "0+","about": "The record create timestamp." },
]
}
|
...
Code Block | ||
---|---|---|
| ||
UpdateMetadata Response (Version: 7) => error_code TAG_BUFFER error_code => INT16 |
MetaData API
Add status_'mode' field in Metadata API that represents the read/write model of the partitions of current topic, meanwhile incrementing the version of ApiKey.
Code Block | ||||
---|---|---|---|---|
| ||||
Metadata Request (Version: 10) => [topics] allow_auto_topic_creation include_cluster_authorized_operations include_topic_authorized_operations TAG_BUFFER
topics => name TAG_BUFFER
name => COMPACT_STRING
allow_auto_topic_creation => BOOLEAN
include_cluster_authorized_operations => BOOLEAN
include_topic_authorized_operations => BOOLEAN
Metadata Response (Version: 10) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations TAG_BUFFER
throttle_time_ms => INT32
brokers => node_id host port rack TAG_BUFFER
node_id => INT32
host => COMPACT_STRING
port => INT32
rack => COMPACT_NULLABLE_STRING
cluster_id => COMPACT_NULLABLE_STRING
controller_id => INT32
topics => error_code name is_internal [partitions] topic_authorized_operations TAG_BUFFER
error_code => INT16
name => COMPACT_STRING
is_internal => BOOLEAN
partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas] TAG_BUFFER
error_code => INT16
partition_index => INT32
leader_id => INT32
leader_epoch => INT32
replica_nodes => INT32
isr_nodes => INT32
offline_replicas => INT32
status_mode => INT8
topic_authorized_operations => INT32
cluster_authorized_operations => INT32
|
Client API changes
Add a new partition status field 'mode' in org.apache.kafka.common.PartitionInfo
- Both KafkaProducer and KafkaConsumer are aware of state of the partition and filter accordingly.
- Partitions with ReadOnly and None status will be filtered out for writes.
- Partitions with None status will be filtered out for reads.
Code Block | ||||
---|---|---|---|---|
| ||||
public class PartitionInfo { private final String topic; private final int partition; private final Node leader; private final Node[] replicas; private final Node[] inSyncReplicas; private final Node[] offlineReplicas; private int statusmode; } |
Kafka Topic Command changes
...
- Added controller event TopicPartitionDeletion
- Add a class TopicPartitionDeletionManager to handle TopicPartitionDeletion event
- When KafkaController starts, a scheduleDelayDeletePartitionTask is scheduled periodically to check retention for delayed deletion.
The workflow involving TopicPartitionDeletionManager workflow involving TopicPartitionDeletionManager class is summarized as below:
- TopicCommand executes the DeletePartition RPC command to KafkaController and saves DeleteTopicPartitionsRecord in the KafkaController metadata.
- TopicPartitionDeleteManager starts to execute onPartitionDeletion method, updates the mode of Partition to ReadOnly. The partition remains in OnlinePartition state. All brokers are notified through PartitionStateMachine.
- ScheduleDelayDeletePartitionTask will update the Partition mode to None after specified delay period. The partition state changes to "OfflinePartition” OfflinePartition and "NonExistentPartition". The brokers are notified through PartitionStateMachine. and the partition replica status changes to "OfflineReplica" and "ReplicaDeletionStarted", stops synchronizing data and clear data at all broker through ReplicaStateMachine.
- When Controller gets the successful stopReplica response from Broker, the Partition replica status is changed to ReplicaDeletionSuccessful, Then it cleans up metadata as well. otherwise, the Partition replica status changes to ReplicaDeletionIneligible, and waits for KafkaController to try again.
Compatibility, Deprecation, and Migration Plan
...
Clients of low version access high version of Broker: The Metadata protocol request version of Topic needs to be above the specified version, and the low version will return LEADER_NOT_AVAILABLE.
The high version client accesses the low version Broker: In the high version client org.apache.kafka.common.PartitionInfo class, the default value of the partition status field of the `status` field is ReadWrite.
Upgrading from any older version is possible, support for this feature that you need to set delete.topic.partition.enable=true, and the Metadata protocol request version of Topic is above the specified version.
Rejected Alternatives