Table of Contents |
---|
Status
Current state: Under Discussion
...
Overall, topic IDs provide a safer way for brokers to replicate topics without any chance of incorrectly interacting with stale topics with the same name. By preventing such scenarios, we can simplify a number of other interactions such as topic deletes which are currently more complicated and problematic than necessary.
Public Interfaces
No Minor changes to public interfaces the TopicDescription interface will be made in this release. However, it may be dangerous to use older versions of Kafka tools with new broker versions when using their --zookeeper
flags. Use of older tools in this way is not supported today.
Proposed Changes
Topic IDs will be represented with 64 bit v4 UUIDs. A UUID with all bits as 0 will be reserved as a null UUID as the Kafka RPC protocol does not allow for nullable fields. When printed or stored as a string, topic IDs will be converted to base64 string representation.
to allow clients to access the topic ID of topics found in metadata responses.
/**
* Create an instance with the specified parameters.
*
* @param name The topic name
* @param internal Whether the topic is internal to Kafka
* @param partitions A list of partitions where the index represents the partition id and the element contains
* leadership and replica information for that partition.
* @param authorizedOperations authorized operations for this topic, or null if this is not known.
* @param topicId Unique value that identifies the topic
*
*/
public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions,
Set<AclOperation> authorizedOperations, UUID topicId)
/**
* A unique identifier for the topic.
*/
public UUID topicId()
Additionally, it may be dangerous to use older versions of Kafka tools with new broker versions when using their --zookeeper
flags. Use of older tools in this way is not supported today.
Proposed Changes
Topic IDs will be represented with 64 bit v4 UUIDs. A UUID with all bits as 0 will be reserved as a null UUID as the Kafka RPC protocol does not allow for nullable fields. When printed or stored as a string, topic IDs will be converted to base64 string representation.
On On handling a CreateTopicRequest brokers will create the topic znode under /brokers/topics/[topic], as usual.
...
The controller will supply topic IDs for all topic partitions to brokers by sending LeaderAndIsrRequest(s) that contain the topic IDs for all partitions contained in the request.
Requests to describe topics will return a result containing TopicDescriptions with topic IDs for each topic
Protocol Changes
LeaderAndIsr
...
|
...
|
LeaderAndIsrRequest v3 adds the topic ID to the topic_states field, and an enum type to denote the type of LeaderAndIsrRequest. Currently, the first LeaderAndIsrRequest sent to a broker by a controller contains all TopicPartitions that a broker is a replica for. We will formalize this behavior by also including a type enum to denote the type of LeaderAndIsrRequest.
value | enum | description |
---|---|---|
0 | UNSPECIFIED | Unspecified type. Defaults to incremental / previous behavior. |
1 | INCREMENTAL | A LeaderAndIsrRequest that is not guaranteed to contain all topic partitions assigned to a broker. |
2 | FULL | A full LeaderAndIsrRequest containing all partitions the broker is a replica for. |
When type = FULL, the broker is able to reconcile its local state on disk with the request. Any partition not contained in this request and present on local disk can be staged for deletion. There are two such types of stale request.
1. The TopicPartition is not present in the LeaderAndIsrRequest.
2. The TopicPartition is contained in the request, but the topic ID that does not match the local topic partition stored on the broker.
Reconciliation may also be necessary if type = INCREMENTAL and the topic ID set on a local partition does not match the topic ID contained in the request. A TopicPartition with the same name and a different topic ID by implies that the local topic partition is stale, as the topic must have been deleted to create a new topic with a different topic ID.
When type = UNSPECIFIED, the request will be treated in a way that allows for backwards compatibility with older request types.
Deletion
Deletion of stale partitions triggered by LeaderAndIsrRequest(s) will take place by:
- Logging at WARN level all partitions that will be deleted and the time that they will be be deleted at.
- Move the partition's directory to log.dir/deleting/{topic_id}_{partition}
- Schedule deletion from disk with a delay of delete.stale.topic.delay.ms ms. This will clear the deleting directory of the partition's contents.
LeaderAndIsrResponse v3
|
StopReplica
StopReplicaRequest v2
|
StopReplicaResponse v2
|
...
2
...
When type = FULL, the broker is able to reconcile its local state on disk with the request. Any partition not contained in this request and present on local disk can be staged for deletion. There are two such types of stale request.
1. The TopicPartition is not present in the LeaderAndIsrRequest.
2. The TopicPartition is contained in the request, but the topic ID that does not match the local topic partition stored on the broker.
Reconciliation may also be necessary if type = INCREMENTAL and the topic ID set on a local partition does not match the topic ID contained in the request. A TopicPartition with the same name and a different topic ID by implies that the local topic partition is stale, as the topic must have been deleted to create a new topic with a different topic ID.
When type = UNSPECIFIED, the request will be treated in a way that allows for backwards compatibility with older request types.
Deletion
Deletion of stale partitions triggered by LeaderAndIsrRequest(s) will take place by:
- Logging at WARN level all partitions that will be deleted and the time that they will be be deleted at.
- Move the partition's directory to log.dir/deleting/{topic_id}_{partition}
- Schedule deletion from disk with a delay of delete.stale.topic.delay.ms ms. This will clear the deleting directory of the partition's contents.
LeaderAndIsrResponse v3
|
StopReplica
StopReplicaRequest v2
|
Fetch
To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads from deleted topics. Note that the leader epoch is not sufficient for preventing these issues, as the partition leader epoch is reset when a topic is deleted and recreated.
FetchRequest v12
|
StopReplicaResponse v2
...
StopReplica Response (Version: 2) => error_code [partitions]
error_code => INT16
partitions => topic topic_id* partition error_code
topic => STRING
topic_id* => UUID
partition => INT32
error_code => INT16
|
FetchResponse v12
|
Fetch
To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads from deleted topics. Note that the leader epoch is not sufficient for preventing these issues, as the partition leader epoch is reset when a topic is deleted and recreated.
FetchRequest v12
|
ListOffsets
To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads from deleted topics.
ListOffsetsRequest v6
|
FetchResponse v12
|
ListOffsetsResponse v6
|
ListOffsets
To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads from deleted topics.
ListOffsetsRequest v6
|
OffsetForLeader
To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads from deleted topics.
OffsetForLeaderRequest v4
|
OffsetForLeaderResponse v4
|
ListOffsetsResponse v6
Metadata
MetadataResponse must be modified so that describeTopics includes the topic id for each topic.
MetadataResponse v10
|
|
|
|
responses => topic topic_id* [partition_responses]
topic => STRING
topic_id* => UUID
partition_responses => partition error_code timestamp offset leader_epoch
partition => INT32
error_code => INT16
timestamp => INT64
offset => INT64
leader_epoch => INT32
OffsetForLeader
To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads from deleted topics.
OffsetForLeaderRequest v4
...
OffsetForLeaderEpoch Request (Version: 4) => replica_id [topics]
replica_id => INT32
topics => topic topic_id* [partitions]
topic => STRING
topic_id* => UUID
partitions => partition current_leader_epoch leader_epoch
partition => INT32
current_leader_epoch => INT32
leader_epoch => INT32
OffsetForLeaderResponse v4
OffsetForLeaderEpoch Response (Version: 4) => throttle_time_ms [topics]
throttle_time_ms => INT32
topics => topic topic_id* [partitions]
topic => STRING
brokers => node_id host port rack |
partitions => error_code partition leader_epoch end_offset
error_code => INT16
partition => INT32
leader_epoch => INT32
end_offset => INT64
|
DeleteTopics
With the addition of topic IDs and the changes to LeaderAndIsrRequest described above, we can now make changes to topic deletion logic that will allow topics to be immediately considered deleted, regardless of whether all replicas have responded to a DeleteTopicsRequest.
...