...
Code Block | ||||
---|---|---|---|---|
| ||||
deletePartitions(Map<String, DeletePartitions> partitions) @InterfaceStability.Evolving public class DeletePartitions { private int totalCount; private boolean immediateDelete; } |
Protocol RPC changes
DeletePartitions API
A new API DeletePartitions will be added with the following DeletePartitionsRequest and DeletePartitionsResponse
...
Code Block | ||
---|---|---|
| ||
DeletePartitionsResponse Response (Version: 0) => throttle_time_ms error_code error_message [responses] TAG_BUFFER throttle_time_ms => INT32 error_code => INT16 error_message => COMPACT_NULLABLE_STRING responses => name [partitions] TAG_BUFFER name => COMPACT_STRING partitions => partition_index status_code error_code error_message TAG_BUFFER partition_index => INT32 status_code => INT8 error_code => INT16 error_message => COMPACT_NULLABLE_STRING |
UpdateMeta API
In UpdateMeta API we add status_mode field to return the partition read/write status of the current topic partitions.
...
Code Block | ||
---|---|---|
| ||
UpdateMetadata Response (Version: 7) => error_code TAG_BUFFER error_code => INT16 |
...
MetData API
...
Add status_code field in Metadata API that represents the partitions of current topic
Code Block | ||||
---|---|---|---|---|
| ||||
Metadata Request (Version: 10) => [topics] allow_auto_topic_creation include_cluster_authorized_operations include_topic_authorized_operations TAG_BUFFER topics => name TAG_BUFFER name => COMPACT_STRING allow_auto_topic_creation => BOOLEAN include_cluster_authorized_operations => BOOLEAN include_topic_authorized_operations => BOOLEAN Metadata Response (Version: 10) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations TAG_BUFFER throttle_time_ms => INT32 brokers => node_id host port rack TAG_BUFFER node_id => INT32 host => COMPACT_STRING port => INT32 rack => COMPACT_NULLABLE_STRING cluster_id => COMPACT_NULLABLE_STRING controller_id => INT32 topics => error_code name is_internal [partitions] topic_authorized_operations TAG_BUFFER error_code => INT16 name => COMPACT_STRING is_internal => BOOLEAN partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas] TAG_BUFFER error_code => INT16 partition_index => INT32 leader_id => INT32 leader_epoch => INT32 replica_nodes => INT32 isr_nodes => INT32 offline_replicas => INT32 status_code => INT8 topic_authorized_operations => INT32 cluster_authorized_operations => INT32 |
...
Add a new partition status field in org.apache.kafka.common.PartitionInfoIn org.apache.kafka.common.Cluster, exclude offline partitions in the availablePartitionsForTopic
- Both KafkaProducer and KafkaConsumer are aware of state of the partition and filter accordingly.
- Partitions with ReadOnly and None status will be filtered out for writes.
- Partitions with None status will be filtered out for reads.
Code Block | ||||
---|---|---|---|---|
| ||||
public class PartitionInfo { private final String topic; private final int partition; private final Node leader; private final Node[] replicas; private final Node[] inSyncReplicas; private final Node[] offlineReplicas; private int status; } |
...