...
The fields added to requests and responses below will be tagged fields to avoid changing the inter-broker protocol.
LeaderAndIsr
LeaderAndIsrRequest
...
v4
LeaderAndIsr Request (Version: 34) => controller_id controller_epoch broker_epoch type* [topic_states] [live_leaders]
controller_id => INT32
controller_epoch => INT32
broker_epoch => INT64
type * => INT8 (tagged field)
topic_states => topic topic_id* [partition_states]
topic => STRING
topic_id* => UUID (tagged field)
partition_states => partition controller_epoch leader leader_epoch [isr] zk_version [replicas] is_new
partition => INT32
controller_epoch => INT32
leader => INT32
leader_epoch => INT32
isr => INT32
zk_version => INT32
replicas => INT32
is_new => BOOLEAN
live_leaders => id host port
id => INT32
host => STRING
port => INT32
|
LeaderAndIsrRequest v3 v4 adds the topic ID to the topic_states field, and an enum type to denote the type of LeaderAndIsrRequest. Currently, the first LeaderAndIsrRequest sent to a broker by a controller contains all TopicPartitions that a broker is a replica for. We will formalize this behavior by also including a type enum to denote the type of LeaderAndIsrRequest.
...
- Logging at WARN level all partitions that will be deleted and the time that they will be be deleted at.
- Move the partition's directory to log.dir/deleting/{topic_id}_{partition}
- Schedule deletion from disk with a delay of delete.stale.topic.delay.ms ms. This will clear the deleting directory of the partition's contents.
LeaderAndIsrResponse
...
v4
LeaderAndIsr Response (Version: 34) => error_code [partitions]
error_code => INT16
partitions => topic topic_id* partition error_code
topic => STRING
topic_id* => UUID (tagged field)
partition => INT32
error_code => INT16
|
...
To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads from deleted topics.
...
ListOffsetRequest v6
ListOffsets ListOffset Request (Version: 6) => replica_id isolation_level [topics]
replica_id => INT32
isolation_level => INT8
topics => topic topic_id* [partitions]
topic => STRING
topic_id* => UUID (tagged field)
partitions => partition current_leader_epoch timestamp
partition => INT32
current_leader_epoch => INT32
timestamp => INT64
|
...
ListOffsetResponse v6
ListOffsets ListOffset Response (Version: 6) => throttle_time_ms [responses]
throttle_time_ms => INT32
responses => topic topic_id* [partition_responses]
topic => STRING
topic_id* => UUID (tagged field)
partition_responses => partition error_code timestamp offset leader_epoch
partition => INT32
error_code => INT16
timestamp => INT64
offset => INT64
leader_epoch => INT32
|
...
MetadataResponse must be modified so that describeTopics includes the topic id for each topic.
MetadataResponse
...
v9
Metadata Response (Version: 109) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations throttle_time_ms => INT32 brokers => node_id host port rack node_id => INT32 host => STRING
port => INT32
rack => STRING cluster_id => STRING controller_id => INT32 topics => error_code name is_internal [partitions] topic_authorized_operations topic_id* error_code => INT16 name => STRING is_internal => BOOL partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas] error_code => INT16 partition_index => INT32 leader_id => INT32 leader_epoch => INT32 replica_nodes => INT32 isr_nodes => INT32 offline_replicas => INT32 topic_authorized_operations => INT32 topic_id* => UUID (tagged field)
cluster_authorized_operations => INT32
|
...
UpdateMetadata should also include the topic ID.
UpdateMetadataRequest
...
v6
UpdateMetadata Request (Version: 76) => controller_id controller_epoch broker_epoch [ungrouped_partition_states] [topic_states] [live_brokers] controller_id => INT32 controller_epoch => INT32 broker_epoch => INT64 ungrouped_partition_states => UpdateMetadataPartitionState topic_states => topic_name topic_id* [partition_states] topic_name => STRING topic_id* => UUID (tagged field) partition_states => UpdateMetadataPartitionState live_brokers => id v0_host v0_port [endpoints] rack id => INT32 v0_host => STRING v0_port => INT32 endpoints => port host listener security_protocol port => INT32 host => STRING listener => STRING security_protocol => INT16 rack => STRING
|
...