...
|
Vote
Vote will be changed to replace topic name with topic ID, and will use a sentinel topic ID if no topic ID has been assigned already. See Compatibility with KIP-500 for more information on sentinel topic IDs.
VoteRequest v0
...
VoteRequest (Version 0) => cluster_id [topics]
cluster_id => STRING
topics => topic_id* [partitions]
topic_id* => UUID
partitions => partition_index candidate_epoch candidate_id last_offset_epoch last_offset
partition_index => INT32
candidate_epoch => INT32
candidate_id => INT32
last_offset_epoch => INT32
last_offset => INT64
DeleteTopics
With the addition of topic IDs and the changes to LeaderAndIsrRequest described above, we can now make changes to topic deletion logic that will allow topics to be immediately considered deleted, regardless of whether all replicas have responded to a DeleteTopicsRequest.
When the controller receives a DeleteTopicsRequest, if the IBP is >= MIN_TOPIC_ID_VERSION it will delete the /brokers/topics/[topic] znode payload and immediately reply to the DeleteTopicsRequest with a successful response. At this point, the topic is considered deleted, and a topic with the same name can be created.
Although the topic is safely deleted at this point, it must still be garbage collected. To garbage collect, the controller will then send StopReplicaRequest(s) to all brokers assigned as replicas for the deleted topic. For the most part, deletion logic can be maintained between IBP versions, with some differences in responses and cleanup in ZooKeeper. Both formats must still be supported, as the IBP may not be bumped right away and deletes may have already been staged before the IBP bump occurs.
The updated controller's delete logic will:
- Collect deleted topics:
- Old format: /admin/delete_topics pulling the topic state from /brokers/topics/[topic].
- New in-memory topic deletion states from received DeleteTopicsRequest(s)
- Remove deleted topics from replicas by sending StopReplicaRequest V3 before the IBP bump using the old logic, and using V4 and the new logic with topic IDs after the IBP bump.
- Finalize successful deletes:
- For /admin/delete_topics deletes, we may need to respond to the TopicDeleteRequest. We can also delete the topic znode at /admin/delete_topics/[topic] and /brokers/topics/[topic].
- For deletes for topics with topic IDs, remove the topic from the in memory topic deletion state on the controller.
- Any unsuccessful StopReplicaRequest(s) will be retried after retryMs, starting from 1) and will be maintained in memory.
This leads to the question of what should be done if the controller never receives a successful response from a replica for a StopReplicaRequest. Under such a scenario it is still safe to stop retrying after a reasonable number of retries and time. Given that LeaderAndIsrRequest v5 includes a type flag, allowing for FULL requests to be identified, any stale partitions will be reconciled and deleted by a broker on startup upon receiving the initial LeaderAndIsrRequest from the a controller. This condition is also safe if the controller changes before the StopReplicaRequest(s) succeed, as the new controller will send a FULL LeaderAndIsrRequest on becoming the leader, ensuring that any stale partitions are cleaned up.
Immediate delete scenarios
Stale reads
- Broker B1 is a leader for topic partition A_p0_id0
- Topic A id0 is deleted.
- Topic A id1 is created.
- Broker B1 has not yet received a new LeaderAndIsrRequest, nor a StopReplicaRequest for topic partition A_p0_id0
- Broker B2 has received a LeaderAndIsrRequest for topic partition A_p0 _id0, and starts fetching from B1.
Inclusion of topic IDs in FetchRequest/ListOffsetRequest/OffsetsForLeaderEpochRequest(s) ensure that this scenario is safe. By adding the topic ID to these request types, any request to stale partitions will not be successful.
Stale state
- Broker B1 is a replica for A_p0_id0.
- Topic A id0 is deleted.
- B1 and has not does not receive a StopReplicaRequest for A_p0_id0.
- Topic A id1 is created.
- Broker B1 receives a LeaderAndIsrRequest containing partition A_p0_id1.
When this occurs, we will close the Log for A_p0_id0, and move A_p0_id0 to the deleting directory as described in the LeaderAndIsrRequest description above.
Storage
Partition Metadata file
To allow brokers to resolve the topic name under this structure, a metadata file will be created at logdir/partitiondir/partition.metadata.
This metadata file will be human readable, and will include:
- Metadata schema version (schema_version: int32)
- Topic ID (id: UUID)
This file will be plain text (key/value pairs).
version: 0 topic_id: 46bdb63f-9e8d-4a38-bf7b-ee4eb2a794e4 |
---|
One important use for this file is the directory structure does not allow us to reload the broker's view of topic ID on startup (perhaps after a failure). It is necessary to persist this file to disk so this information can be reloaded.
During LeaderAndIsrRequests, this file may be used to disambiguate topics safely and delete topics if necessary. More details on this process are explained in the LeaderAndIsrRequest v5 section.
It will be easy to update the file to include more fields in the future.
In the JBOD mode, a partition's data can be moved from one disk to another. The partition metadata file would be copied during this process.
Tooling
kafka-topics.sh --describe will be updated to include the topic ID in the output. A user can specify a topic name to describe with the --topic parameter, or alternatively the user can supply a topic ID with the --topic_id parameter
Migration
Upon a controller becoming active, the list of current topics is loaded from /brokers/topics/[topic]. When a topic without a topic ID is found, a UUID will be randomly generated and assigned the topic information at /brokers/topics/[topic] will be updated with the id filled and the schema version bumped to version 3.
LeaderAndIsrRequest(s) will only be sent by the controller once a topic ID has been successfully assigned to the topic. Since the LeaderAndIsrRequest version was bumped, the IBP must also be bumped for migration.
When a replica receives a LeaderAndIsrRequest containing a topic ID for an existing partition which does not have a topic ID associated, it will create a partition metadata file for the topic partition locally. At this point the local partition will have been migrated to support topic IDs.
Configuration
The following configuration options will be added:
Option | Unit | Default | Description |
---|---|---|---|
delete.stale.topic.delay.ms | ms | 14400 (4 hours) | When a FULL or INCREMENTAL LeaderAndIsrRequest is received and the request does not contain a partition that exists on a broker or a broker's topic ID does not match the ID in the request, a deletion event will be staged for that partition which will complete after delete.stale.topic.delay.ms milliseconds. |
Compatibility with KIP-500
KIP-500 and KIP-595 utilize a special metadata topic to store information that ZooKeeper has stored in the past. This topic must exist before the controller election, but in KIP-516, topic IDs are assigned in the controller. Here is an outline of how we can handle this.
Problem: KIP-595 describes a Vote Request which is used to elect the controller. Currently KIP-595 contains the topic name as part of the protocol.
Solution: Use a sentinel ID reserved only for this topic before its ID is known.
Switching over to topic IDs in this KIP will result in fewer changes later on.
Problem: Post Zookeeper, a Fetch request for the metadata topic will be used to obtain information that was once stored in Zookeeper. KIP-516 stores topic IDs in Zookeeper, and the controller pushes them to brokers using LeaderAndIsrRequests. This will change to pulling the topic IDs to the broker with a fetch of the metadata topic. KIP-516 is replacing the topic name field with a topic ID field. So how will the first Fetch request know the correct topic ID for the metadata topic?
Solution: Use the same sentinel ID reserved for the metadata topic before its ID is known. After controller election, upon receiving the result, assign the metadata topic its unique topic ID.
Using a topic ID will result in a slightly smaller fetch request and likely prevent further changes. Assigning a unique ID for the metadata topic leaves the possibility for the topic to be placed in tiered storage, or used in other scenarios where topics from multiple clusters may be in one place without appending the cluster ID.
Sentinel ID
The idea is that this will be a hard-coded UUID that no other topic can be assigned. Initially the all zero UUID was considered, but was ultimately rejected since this is used as a null ID in some places and it is better to keep these usages separate. An example of a hard-coded UUID is 00000000-0000-0000-0000-000000000001
Vote
Vote will be changed to replace topic name with topic ID, and will use a sentinel topic ID if no topic ID has been assigned already. See above for more information on sentinel topic IDs.
VoteRequest v0
|
VoteResponse v0
|
BeginQuorumEpoch
BeginQuorumEpoch will replace the topic name field with the topic id field
BeginQuorumEpochRequest v0
|
BeginQuorumEpochResponse v0
|
EndQuorumEpoch
EndQuorumEpoch will replace the topic name field with the topic id field
EndQuorumEpochRequest v0
|
EndQuorumEpochResponse v0
VoteResponse v0
...
VoteResponse (Version 0) => error_code [topics]
cluster_id => INT16
topics => topic_id* [partitions]
topic_id* => UUID
partitions => partition_index error_code leader_id leader_epoch vote_granted
partition_index => INT32
error_code => INT16
leader_id => INT32
leader_epoch => INT32
voted_granted => BOOL
BeginQuorumEpoch
BeginQuorumEpoch will replace the topic name field with the topic id field
BeginQuorumEpochRequest v0
...
BeginQuorumEpochRequest (Version 0) => cluster_id [topics]
cluster_id => STRING
topics => topic_id* [partitions]
topic_id* => UUID
partitions => partition_index leader_id leader_epoch
partition_index => INT32
leader_id => INT32
leader_epoch => INT32
BeginQuorumEpochResponse v0
...
BeginQuorumEpochResponse (Version 0) => error_code [topics]
cluster_id => INT16
topics => topic_id* [partitions]
topic_id* => UUID
partitions => partition_index error_code leader_id leader_epoch
partition_index => INT32
error_code => INT16
leader_id => INT32
leader_epoch => INT32
EndQuorumEpoch
EndQuorumEpoch will replace the topic name field with the topic id field
EndQuorumEpochRequest v0
...
EndQuorumEpochRequest (Version 0) => cluster_id [topics]
cluster_id => STRING
topics => topic_id* [partitions]
topic_id* => UUID
partitions => partition_index replica_id leader_id leader_epoch [preferred_successors]
partition_index => INT32
replica_id => INT32
leader_id => INT32
leader_epoch => INT32
preferred_successors => INT32
EndQuorumEpochResponse v0
...
EndQuorumEpochResponse (Version 0) => error_code [topics]
cluster_id => INT16
topics => topic_id* [partitions]
topic_id* => UUID
partitions => partition_index error_code leader_id leader_epoch
partition_index => INT32
error_code => INT16
leader_id => INT32
leader_epoch => INT32
...
With the addition of topic IDs and the changes to LeaderAndIsrRequest described above, we can now make changes to topic deletion logic that will allow topics to be immediately considered deleted, regardless of whether all replicas have responded to a DeleteTopicsRequest.
When the controller receives a DeleteTopicsRequest, if the IBP is >= MIN_TOPIC_ID_VERSION it will delete the /brokers/topics/[topic] znode payload and immediately reply to the DeleteTopicsRequest with a successful response. At this point, the topic is considered deleted, and a topic with the same name can be created.
Although the topic is safely deleted at this point, it must still be garbage collected. To garbage collect, the controller will then send StopReplicaRequest(s) to all brokers assigned as replicas for the deleted topic. For the most part, deletion logic can be maintained between IBP versions, with some differences in responses and cleanup in ZooKeeper. Both formats must still be supported, as the IBP may not be bumped right away and deletes may have already been staged before the IBP bump occurs.
The updated controller's delete logic will:
- Collect deleted topics:
- Old format: /admin/delete_topics pulling the topic state from /brokers/topics/[topic].
- New in-memory topic deletion states from received DeleteTopicsRequest(s)
- Remove deleted topics from replicas by sending StopReplicaRequest V3 before the IBP bump using the old logic, and using V4 and the new logic with topic IDs after the IBP bump.
- Finalize successful deletes:
- For /admin/delete_topics deletes, we may need to respond to the TopicDeleteRequest. We can also delete the topic znode at /admin/delete_topics/[topic] and /brokers/topics/[topic].
- For deletes for topics with topic IDs, remove the topic from the in memory topic deletion state on the controller.
- Any unsuccessful StopReplicaRequest(s) will be retried after retryMs, starting from 1) and will be maintained in memory.
This leads to the question of what should be done if the controller never receives a successful response from a replica for a StopReplicaRequest. Under such a scenario it is still safe to stop retrying after a reasonable number of retries and time. Given that LeaderAndIsrRequest v5 includes a type flag, allowing for FULL requests to be identified, any stale partitions will be reconciled and deleted by a broker on startup upon receiving the initial LeaderAndIsrRequest from the a controller. This condition is also safe if the controller changes before the StopReplicaRequest(s) succeed, as the new controller will send a FULL LeaderAndIsrRequest on becoming the leader, ensuring that any stale partitions are cleaned up.
Immediate delete scenarios
Stale reads
- Broker B1 is a leader for topic partition A_p0_id0
- Topic A id0 is deleted.
- Topic A id1 is created.
- Broker B1 has not yet received a new LeaderAndIsrRequest, nor a StopReplicaRequest for topic partition A_p0_id0
- Broker B2 has received a LeaderAndIsrRequest for topic partition A_p0 _id0, and starts fetching from B1.
Inclusion of topic IDs in FetchRequest/ListOffsetRequest/OffsetsForLeaderEpochRequest(s) ensure that this scenario is safe. By adding the topic ID to these request types, any request to stale partitions will not be successful.
Stale state
- Broker B1 is a replica for A_p0_id0.
- Topic A id0 is deleted.
- B1 and has not does not receive a StopReplicaRequest for A_p0_id0.
- Topic A id1 is created.
- Broker B1 receives a LeaderAndIsrRequest containing partition A_p0_id1.
When this occurs, we will close the Log for A_p0_id0, and move A_p0_id0 to the deleting directory as described in the LeaderAndIsrRequest description above.
Storage
Partition Metadata file
To allow brokers to resolve the topic name under this structure, a metadata file will be created at logdir/partitiondir/partition.metadata.
This metadata file will be human readable, and will include:
- Metadata schema version (schema_version: int32)
- Topic ID (id: UUID)
This file will be plain text (key/value pairs).
...
version: 0
topic_id: 46bdb63f-9e8d-4a38-bf7b-ee4eb2a794e4
One important use for this file is the directory structure does not allow us to reload the broker's view of topic ID on startup (perhaps after a failure). It is necessary to persist this file to disk so this information can be reloaded.
During LeaderAndIsrRequests, this file may be used to disambiguate topics safely and delete topics if necessary. More details on this process are explained in the LeaderAndIsrRequest v5 section.
It will be easy to update the file to include more fields in the future.
In the JBOD mode, a partition's data can be moved from one disk to another. The partition metadata file would be copied during this process.
Compatibility with KIP-500
KIP-500 and KIP-595 utilize a special metadata topic to store information that ZooKeeper has stored in the past. This topic must exist before the controller election, but in KIP-516, topic IDs are assigned in the controller. Here is an outline of how we can handle this.
Problem: KIP-595 describes a Vote Request which is used to elect the controller. Currently KIP-595 contains the topic name as part of the protocol.
Solution: Use a sentinel ID reserved only for this topic before its ID is known.
Switching over to topic IDs in this KIP will result in fewer changes later on.
Problem: Post Zookeeper, a Fetch request for the metadata topic will be used to obtain information that was once stored in Zookeeper. KIP-516 stores topic IDs in Zookeeper, and the controller pushes them to brokers using LeaderAndIsrRequests. This will change to pulling the topic IDs to the broker with a fetch of the metadata topic. KIP-516 is replacing the topic name field with a topic ID field. So how will the first Fetch request know the correct topic ID for the metadata topic?
Solution: Use the same sentinel ID reserved for the metadata topic before its ID is known. After controller election, upon receiving the result, assign the metadata topic its unique topic ID.
Using a topic ID will result in a slightly smaller fetch request and likely prevent further changes. Assigning a unique ID for the metadata topic leaves the possibility for the topic to be placed in tiered storage, or used in other scenarios where topics from multiple clusters may be in one place without appending the cluster ID.
Sentinel ID
The idea is that this will be a hard-coded UUID that no other topic can be assigned. Initially the all zero UUID was considered, but was ultimately rejected since this is used as a null ID in some places and it is better to keep these usages separate. An example of a hard-coded UUID is 00000000-0000-0000-0000-000000000001
Tooling
...
Migration
Upon a controller becoming active, the list of current topics is loaded from /brokers/topics/[topic]. When a topic without a topic ID is found, a UUID will be randomly generated and assigned the topic information at /brokers/topics/[topic] will be updated with the id filled and the schema version bumped to version 3.
LeaderAndIsrRequest(s) will only be sent by the controller once a topic ID has been successfully assigned to the topic. Since the LeaderAndIsrRequest version was bumped, the IBP must also be bumped for migration.
When a replica receives a LeaderAndIsrRequest containing a topic ID for an existing partition which does not have a topic ID associated, it will create a partition metadata file for the topic partition locally. At this point the local partition will have been migrated to support topic IDs.
Configuration
The following configuration options will be added:
|
Compatibility, Deprecation, and Migration Plan
We will need to support all API calls which refer to a partition by either (topicId, partition) or (topicName, partition) until clients are updated to interact with topics by ID. No deprecations are currently planned.
...