Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
linenumberstrue
deletePartitions(Map<String, DeletePartitions> partitions)

@InterfaceStability.Evolving
public class DeletePartitions {
    private int totalCount;
    private boolean immediateDelete;
}

Protocol RPC changes

DeletePartitions API

A new API DeletePartitions will be added with the following DeletePartitionsRequest and DeletePartitionsResponse

...

Code Block
linenumberstrue
DeletePartitionsResponse Response (Version: 0) => throttle_time_ms error_code error_message [responses] TAG_BUFFER 
	throttle_time_ms => INT32
	error_code => INT16
	error_message => COMPACT_NULLABLE_STRING
	responses => name [partitions] TAG_BUFFER 
		name => COMPACT_STRING
		partitions => partition_index status_code error_code error_message TAG_BUFFER 
			partition_index => INT32
			status_code => INT8
			error_code => INT16
			error_message => COMPACT_NULLABLE_STRING

UpdateMeta API

In UpdateMeta API we add status_mode field to return the partition read/write status of the current topic partitions.

...

Code Block
linenumberstrue
UpdateMetadata Response (Version: 7) => error_code TAG_BUFFER 
  error_code => INT16

...

MetData API

...

Add status_code field in Metadata API that represents the partitions of current topic

Code Block
languagejava
linenumberstrue
Metadata Request (Version: 10) => [topics] allow_auto_topic_creation include_cluster_authorized_operations include_topic_authorized_operations TAG_BUFFER 
  topics => name TAG_BUFFER 
    name => COMPACT_STRING
  allow_auto_topic_creation => BOOLEAN
  include_cluster_authorized_operations => BOOLEAN
  include_topic_authorized_operations => BOOLEAN

Metadata Response (Version: 10) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations TAG_BUFFER 
  throttle_time_ms => INT32
  brokers => node_id host port rack TAG_BUFFER 
    node_id => INT32
    host => COMPACT_STRING
    port => INT32
    rack => COMPACT_NULLABLE_STRING
  cluster_id => COMPACT_NULLABLE_STRING
  controller_id => INT32
  topics => error_code name is_internal [partitions] topic_authorized_operations TAG_BUFFER 
    error_code => INT16
    name => COMPACT_STRING
    is_internal => BOOLEAN
    partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas] TAG_BUFFER 
      error_code => INT16
      partition_index => INT32
      leader_id => INT32
      leader_epoch => INT32
      replica_nodes => INT32
      isr_nodes => INT32
      offline_replicas => INT32
      status_code => INT8
    topic_authorized_operations => INT32
  cluster_authorized_operations => INT32

...

  • Add a new partition status field in org.apache.kafka.common.PartitionInfoIn org.apache.kafka.common.Cluster, exclude offline partitions in the availablePartitionsForTopic 

  • Both KafkaProducer and KafkaConsumer are aware of state of the partition and filter accordingly.
    • Partitions with ReadOnly and None status will be filtered out for writes.
    • Partitions with None status will be filtered out for reads.
Code Block
languagejava
linenumberstrue
public class PartitionInfo {
    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas;
    private final Node[] inSyncReplicas;
    private final Node[] offlineReplicas;
    private int status;
}

...