...
Requests to describe topics will return a result containing TopicDescriptions with topic IDs for each topic
Protocol Changes
The fields added to requests and responses below will be tagged fields to avoid changing the inter-broker protocol.
LeaderAndIsr
LeaderAndIsrRequest v3
LeaderAndIsr Request (Version: 3) => controller_id controller_epoch broker_epoch type* [topic_states] [live_leaders]
controller_id => INT32
controller_epoch => INT32
broker_epoch => INT64
type * => INT8 (tagged field)
topic_states => topic topic_id* [partition_states]
topic => STRING
topic_id* => UUID (tagged field)
partition_states => partition controller_epoch leader leader_epoch [isr] zk_version [replicas] is_new
partition => INT32
controller_epoch => INT32
leader => INT32
leader_epoch => INT32
isr => INT32
zk_version => INT32
replicas => INT32
is_new => BOOLEAN
live_leaders => id host port
id => INT32
host => STRING
port => INT32
|
...
LeaderAndIsr Response (Version: 3) => error_code [partitions]
error_code => INT16
partitions => topic topic_id* partition error_code
topic => STRING
topic_id* => UUID (tagged field)
partition => INT32
error_code => INT16
|
...
StopReplica Request (Version: 2) => controller_id controller_epoch broker_epoch delete_partitions [partitions]
controller_id => INT32
controller_epoch => INT32
broker_epoch => INT64
delete_partitions => BOOLEAN
partitions => topic topic_id* [partition_ids]
topic => STRING
topic_id* => UUIDUUID (tagged field)
partition_ids => INT32
|
...
StopReplica Response (Version: 2) => error_code [partitions]
error_code => INT16
partitions => topic topic_id* partition error_code
topic => STRING
topic_id* => UUIDUUID (tagged field)
partition => INT32
error_code => INT16
|
...
Fetch Request (Version: 12) => replica_id max_wait_time min_bytes max_bytes isolation_level session_id session_epoch [topics] [forgotten_topics_data] rack_id
replica_id => INT32
max_wait_time => INT32
min_bytes => INT32
max_bytes => INT32
isolation_level => INT8
session_id => INT32
session_epoch => INT32
topics => topic topic_id* [partitions]
topic => STRING
topic_id* => UUIDUUID (tagged field)
partitions => partition current_leader_epoch fetch_offset log_start_offset partition_max_bytes
partition => INT32
current_leader_epoch => INT32
fetch_offset => INT64
log_start_offset => INT64
partition_max_bytes => INT32
forgotten_topics_data => topic [partitions]
topic => STRING
partitions => INT32
rack_id => STRING
|
...
Fetch Response (Version: 12) => throttle_time_ms error_code session_id [responses]
throttle_time_ms => INT32
error_code => INT16
session_id => INT32
responses => topic topic_id* [partition_responses]
topic => STRING
topic_id* => UUIDUUID (tagged field)
partition_responses => partition_header record_set
partition_header => partition error_code high_watermark last_stable_offset log_start_offset [aborted_transactions] preferred_read_replica
partition => INT32
error_code => INT16
high_watermark => INT64
last_stable_offset => INT64
log_start_offset => INT64
aborted_transactions => producer_id first_offset
producer_id => INT64
first_offset => INT64
preferred_read_replica => INT32
record_set => RECORDS
|
...
ListOffsets Request (Version: 6) => replica_id isolation_level [topics]
replica_id => INT32
isolation_level => INT8
topics => topic topic_id* [partitions]
topic => STRING
topic_id* => UUIDUUID (tagged field)
partitions => partition current_leader_epoch timestamp
partition => INT32
current_leader_epoch => INT32
timestamp => INT64
|
...
ListOffsets Response (Version: 6) => throttle_time_ms [responses]
throttle_time_ms => INT32
responses => topic topic_id* [partition_responses]
topic => STRING
topic_id* => UUIDUUID (tagged field)
partition_responses => partition error_code timestamp offset leader_epoch
partition => INT32
error_code => INT16
timestamp => INT64
offset => INT64
leader_epoch => INT32
|
...
OffsetForLeaderEpoch Request (Version: 4) => replica_id [topics]
replica_id => INT32
topics => topic topic_id* [partitions]
topic => STRING
topic_id* => UUIDUUID (tagged field)
partitions => partition current_leader_epoch leader_epoch
partition => INT32
current_leader_epoch => INT32
leader_epoch => INT32
|
...
OffsetForLeaderEpoch Response (Version: 4) => throttle_time_ms [topics]
throttle_time_ms => INT32
topics => topic topic_id* [partitions]
topic => STRING
topic_id* => UUIDUUID (tagged field)
partitions => error_code partition leader_epoch end_offset
error_code => INT16
partition => INT32
leader_epoch => INT32
end_offset => INT64
|
...
MetadataResponse must be modified so that describeTopics includes the topic id for each topic.
MetadataResponse v10
Metadata Response (Version: 10) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations throttle_time_ms => INT32 brokers => node_id host port rack node_id => INT32 host => STRING
port => INT32
rack => STRING cluster_id => STRING controller_id => INT32 topics => error_code name is_internal [partitions] topic_authorized_operations topic_id* error_code => INT16 name => STRING is_internal => BOOL partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas] error_code => INT16 partition_index => INT32 leader_id => INT32 leader_epoch => INT32 replica_nodes => INT32 isr_nodes => INT32 offline_replicas => INT32 topic_authorized_operations => INT32 topic_id* =>
|
UUIDUUID (tagged field)
cluster_authorized_operations => INT32
|
UpdateMetadata
UpdateMetadata should also include the topic ID.
UpdateMetadataRequest v7
UpdateMetadata Request (Version: 7) => controller_id controller_epoch broker_epoch [ungrouped_partition_states] [topic_states] [live_brokers] controller_id => INT32 controller_epoch => INT32 broker_epoch => INT64 ungrouped_partition_states => UpdateMetadataPartitionState topic_states => topic_name topic_id* [partition_states] topic_name => STRING topic_id* => UUID (tagged field) partition_states => UpdateMetadataPartitionState live_brokers => id v0_host v0_port [endpoints] rack id => INT32 v0_host => STRING v0_port => INT32 endpoints => port host listener security_protocol port => INT32 host => STRING listener => STRING security_protocol => INT16 rack => STRING
|
DeleteTopics
With the addition of topic IDs and the changes to LeaderAndIsrRequest described above, we can now make changes to topic deletion logic that will allow topics to be immediately considered deleted, regardless of whether all replicas have responded to a DeleteTopicsRequest.
...
LeaderAndIsrRequest(s) will only be sent by the controller once a topic ID has been successfully assigned to the topic. The migration process can take place without an inter-broker protocol bump, as the format stored in /brokers/topics/[topic] will be compatible with older broker versions. The topic ids in requests and responses will be tagged fields to avoid changing the inter-broker protocol.
When a replica receives a LeaderAndIsrRequest containing a topic ID for an existing partition which does not have a topic ID associated, it will create a partition metadata file for the topic partition locally. At this point the local partition will have been migrated to support topic IDs.
...