...
Requests to describe topics will return a result containing TopicDescriptions with topic IDs for each topic
Protocol Changes
The fields added to requests and responses below will be tagged fields to avoid changing the inter-broker protocol.
LeaderAndIsr
LeaderAndIsrRequest v5
LeaderAndIsr Request (Version: 5) => controller_id controller_epoch broker_epoch type* [topic_states] [live_leaders]
controller_id => INT32
controller_epoch => INT32
broker_epoch => INT64
type * => INT8 (tagged field)INT8
topic_states => topic topic_id* [partition_states]
topic => STRING
topic_id* => UUID (tagged field)UUID
partition_states => partition controller_epoch leader leader_epoch [isr] zk_version [replicas] is_new
partition => INT32
controller_epoch => INT32
leader => INT32
leader_epoch => INT32
isr => INT32
zk_version => INT32
replicas => INT32
is_new => BOOLEAN
live_leaders => id host port
id => INT32
host => STRING
port => INT32
|
LeaderAndIsrRequest v4 adds v5 adds the topic ID to the topic_states field, and an enum type to denote the type of LeaderAndIsrRequest. Currently, the first LeaderAndIsrRequest sent to a broker by a controller contains all TopicPartitions that a broker is a replica for. We will formalize this behavior by also including a type enum to denote the type of LeaderAndIsrRequest.
...
LeaderAndIsr Response (Version: 5) => error_code [partitions]
error_code => INT16
partitions => topic topic_id* partition error_code
topic_id* => UUID (tagged field)UUID
partition => INT32
error_code => INT16
|
The topic name field has been removed.
StopReplica
StopReplicaRequest
...
v4
StopReplica Request (Version: 24) => controller_id controller_epoch broker_epoch delete_partitions [partitions]
controller_id => INT32
controller_epoch => INT32
broker_epoch => INT64
delete_partitions => BOOLEAN
partitions => topic topic_id* [partition_ids]
topic => STRING
topic_id* => UUID (tagged field)
partition_ids => INT32
|
StopReplicaResponse
...
v4
StopReplica Response (Version: 24) => error_code [partitions]
error_code => INT16
partitions => topic topic_id* partition error_code
topic => STRING
topic_id* => UUID (tagged field)
partition => INT32
error_code => INT16
|
...
Fetch Request (Version: 12) => replica_id max_wait_time min_bytes max_bytes isolation_level session_id session_epoch [topics] [forgotten_topics_data] rack_id
replica_id => INT32
max_wait_time => INT32
min_bytes => INT32
max_bytes => INT32
isolation_level => INT8
session_id => INT32
session_epoch => INT32
topics => topic topic_id* [partitions]
topic => STRING
topic_id* => UUID (tagged field)
partitions => partition current_leader_epoch fetch_offset log_start_offset partition_max_bytes
partition => INT32
current_leader_epoch => INT32
fetch_offset => INT64
log_start_offset => INT64
partition_max_bytes => INT32
forgotten_topics_data => topic [partitions]
topic => STRING
partitions => INT32
rack_id => STRING
|
...
Fetch Response (Version: 12) => throttle_time_ms error_code session_id [responses]
throttle_time_ms => INT32
error_code => INT16
session_id => INT32
responses => topic topic_id* [partition_responses]
topic => STRING
topic_id* => UUID (tagged field)
partition_responses => partition_header record_set
partition_header => partition error_code high_watermark last_stable_offset log_start_offset [aborted_transactions] preferred_read_replica
partition => INT32
error_code => INT16
high_watermark => INT64
last_stable_offset => INT64
log_start_offset => INT64
aborted_transactions => producer_id first_offset
producer_id => INT64
first_offset => INT64
preferred_read_replica => INT32
record_set => RECORDS
|
...
ListOffset Request (Version: 6) => replica_id isolation_level [topics]
replica_id => INT32
isolation_level => INT8
topics => topic topic_id* [partitions]
topic => STRING
topic_id* => UUID (tagged field)UUID
partitions => partition current_leader_epoch timestamp
partition => INT32
current_leader_epoch => INT32
timestamp => INT64
|
...
ListOffset Response (Version: 6) => throttle_time_ms [responses]
throttle_time_ms => INT32
responses => topic topic_id* [partition_responses]
topic => STRING
topic_id* => UUID (tagged field)UUID
partition_responses => partition error_code timestamp offset leader_epoch
partition => INT32
error_code => INT16
timestamp => INT64
offset => INT64
leader_epoch => INT32
|
...
OffsetForLeaderEpoch Request (Version: 4) => replica_id [topics]
replica_id => INT32
topics => topic topic_id* [partitions]
topic => STRING
topic_id* => UUID (tagged field)
partitions => partition current_leader_epoch leader_epoch
partition => INT32
current_leader_epoch => INT32
leader_epoch => INT32
|
...
OffsetForLeaderEpoch Response (Version: 4) => throttle_time_ms [topics]
throttle_time_ms => INT32
topics => topic topic_id* [partitions]
topic => STRING
topic_id* => UUID (tagged field)
partitions => error_code partition leader_epoch end_offset
error_code => INT16
partition => INT32
leader_epoch => INT32
end_offset => INT64
|
...
MetadataResponse must be modified so that describeTopics includes the topic id for each topic.
MetadataResponse
...
v10
Metadata Response (Version: 910) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations throttle_time_ms => INT32 brokers => node_id host port rack node_id => INT32 host => STRING
port => INT32
rack => STRING cluster_id => STRING controller_id => INT32 topics => error_code name is_internal [partitions] topic_authorized_operations topic_id* error_code => INT16 name => STRING is_internal => BOOL partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas] error_code => INT16 partition_index => INT32 leader_id => INT32 leader_epoch => INT32 replica_nodes => INT32 isr_nodes => INT32 offline_replicas => INT32 topic_authorized_operations => INT32 topic_id* => UUID (tagged field) UUID
cluster_authorized_operations => INT32
|
...
UpdateMetadata should also include the topic ID.
UpdateMetadataRequest
...
v7
UpdateMetadata Request (Version: 67) => controller_id controller_epoch broker_epoch [ungrouped_partition_states] [topic_states] [live_brokers] controller_id => INT32 controller_epoch => INT32 broker_epoch => INT64 ungrouped_partition_states => UpdateMetadataPartitionState topic_states => topic_name topic_id* [partition_states] topic_name => STRING topic_id* => UUID (tagged field) partition_states => UpdateMetadataPartitionState live_brokers => id v0_host v0_port [endpoints] rack id => INT32 v0_host => STRING v0_port => INT32 endpoints => port host listener security_protocol port => INT32 host => STRING listener => STRING security_protocol => INT16 rack => STRING
|
...
- Collect deleted topics:
- Old format: /admin/delete_topics pulling the topic state from /brokers/topics/[topic].
- New in-memory topic deletion states from received DeleteTopicsRequest(s)
- Remove deleted topics from replicas by sending StopReplicaRequest V1 for V3 for any topics which do not contain a topic ID, and V2 V4 for any topics which do contain a topic ID.
- Finalize successful deletes:
- For /admin/delete_topics deletes, we may need to respond to the TopicDeleteRequest. We can also delete the topic znode at /admin/delete_topics/[topic] and /brokers/topics/[topic].
- For deletes for topics with topic IDs, remove the topic from the in memory topic deletion state on the controller.
- Any unsuccessful StopReplicaRequest(s) will be retried after retryMs, starting from 1) and will be maintained in memory.
This leads to the question of what should be done if the controller never receives a successful response from a replica for a StopReplicaRequest. Under such a scenario it is still safe to stop retrying after a reasonable number of retries and time. Given that LeaderAndIsrRequest v4 includes v5 includes a type flag, allowing for FULL requests to be identified, any stale partitions will be reconciled and deleted by a broker on startup upon receiving the initial LeaderAndIsrRequest from the a controller. This condition is also safe if the controller changes before the StopReplicaRequest(s) succeed, as the new controller will send a FULL LeaderAndIsrRequest on becoming the leader, ensuring that any stale partitions are cleaned up.
...
LeaderAndIsrRequest(s) will only be sent by the controller once a topic ID has been successfully assigned to the topic. The migration process can take place without an inter-broker protocol bump, as the format stored in /brokers/topics/[topic] will be compatible with older broker versions. The topic ids in requests and responses will be tagged fields to avoid changing the inter-broker protocol.
When a replica receives a LeaderAndIsrRequest containing a topic ID for an existing partition which does not have a topic ID associated, it will create a partition metadata file for the topic partition locally. At this point the local partition will have been migrated to support topic IDs.
...