...
Overall, topic IDs provide a safer way for brokers to replicate topics without any chance of incorrectly interacting with stale topics with the same name. By preventing such scenarios, we can simplify a number of other interactions such as topic deletes which are currently more complicated and problematic than necessary.
Public Interfaces
TopicDescription
Minor changes to the TopicDescription interface will be made to allow clients to access the topic ID of topics found in metadata responses.
/**
* Create an instance with the specified parameters.
*
* @param name The topic name
* @param internal Whether the topic is internal to Kafka
* @param partitions A list of partitions where the index represents the partition id and the element contains
* leadership and replica information for that partition.
* @param authorizedOperations authorized operations for this topic, or null if this is not known.
* @param topicId Unique value that identifies the topic
*
*/
public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions,
Set<AclOperation> authorizedOperations, UUID topicId)
/**
* A unique identifier for the topic.
*/
public UUID topicId()
UUID
A new UUID class will be exposed under /org/apache/kafka/common
UUID
A new UUID class will be exposed under /org/apache/kafka/common
/* * This class defines an immutable universally unique identifier (UUID). It represents a 128-bit value. * More specifically, the random UUIDs in this class are variant 2 (Leach-Salz) version 4 UUIDs. * This definition is very similar to java.util.UUID. One notable difference is that the toString() method prints * using the base64 string encoding. */ public class UUID { /** * A UUID where all bits are zero/* * This class defines an immutable universally unique identifier (UUID). It represents a 128-bit value null or empty UUID. */ More specifically,public thestatic randomfinal UUIDs in this class are variant 2 (Leach-Salz) version 4 UUIDs. * This definition is very similar to java.util.UUID. One notable difference is that the toString() method prints * using the base64 string encoding. */ public class UUID { UUID ZERO_UUID /** * Constructs a 128-bit type 4 UUID where the first long represents the the most significant 64 bits * and the second long represents the least significant 64 bits. */ public UUID(long mostSigBits, long leastSigBits) /** * AStatic UUIDfactory whereto allretrieve bitsa aretype zero.4 It(pseudo represents a null or emptyrandomly generated) UUID. */ public static final UUID ZERO_UUID randomUUID() /** * Constructs a 128-bit type 4 UUID whereReturns the first long represents the the most significant 64 bits * andof the secondUUID's long128 represents the least significant 64 bitsbit value. */ public UUID(long mostSigBits, long leastSigBitsgetMostSignificantBits() /** * StaticReturns factorythe toleast retrievesignificant abits typeof 4 (pseudo randomly generated) UUIDthe UUID's 128 bit value. */ public staticlong UUID randomUUIDgetLeastSignificantBits() /** * Returns thetrue mostiff significantthe bitsobj ofis theanother UUID's with 128the bitsame value. */ public longboolean getMostSignificantBitsequals(Object obj) /** * Returns thea leasthash significantcode bitsfor of thethis UUID's 128 bit value. */ public longint getLeastSignificantBitshashCode() /** * Returns truea iffbase64 thestring objencoding isof anotherthe UUID with the same value. */ public booleanString equalstoString(Object obj) )
/** * ReturnsCreates a hash code for this UUID */ public int hashCode() /** * Returns based on a base64 string encoding used ofin the toString() UUIDmethod. */ public static UUID fromString(String toString(str)
/** * Creates a UUID based on a base64 string encoding used in the toString() method. */ public static UUID fromString(String str)
}
}
Additionally, it may be Additionally, it may be dangerous to use older versions of Kafka tools with new broker versions when using their --zookeeper
flags. Use of older tools in this way is not supported today.
...
|
Metadata
MetadataResponse must be modified so that describeTopics includes the topic id for each topic.
MetadataResponse v10
UpdateMetadata
UpdateMetadata should also include the topic ID.
UpdateMetadataRequest v7
|
Produce
Swapping a the topic name for the topic ID will cut down on the size of the request.
ProduceRequest v9
|
cluster_authorized_operations => INT32
UpdateMetadata
UpdateMetadata should also include the topic ID.
UpdateMetadataRequest v7
|
ProduceResponse v9
|
controller_id => INT32
controller_epoch => INT32
broker_epoch => INT64
ungrouped_partition_states => UpdateMetadataPartitionState
topic_states => topic_name topic_id* [partition_states
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
rack => STRING
Produce
Swapping a the topic name for the topic ID will cut down on the size of the request.
ProduceRequest v9
...
Produce Request (Version 9) => transactional_id acks timeout_ms [topics]
transactional_id => STRING
acks => INT16
timeout_ms => INT32
topics => topic_id* [partitions]
topic_id* => UUID
partitions => partition_index records
partition_index => INT32
records => BYTES
ProduceResponse v9
...
Produce Response (Version 9) => [responses] throttle_time_ms
responses => topic_id* [partitions]
topic_id* => UUID
partitions => partition_index error_code base_offset log_append_time_ms log_start_offset [record_errors] error_message
partition_index => INT32
error_code => INT16
base_offset => INT64 log_append_time_ms => INT64
log_start_offset => INT64 record_errors => batch_index batch_index_error_message
batch_index => INT32
batch_index_error_message => STRING
error_message => STRING throttle_time_ms => INT32
|
DeleteTopics
With the addition of topic IDs and the changes to LeaderAndIsrRequest described above, we can now make changes to topic deletion logic that will allow topics to be immediately considered deleted, regardless of whether all replicas have responded to a DeleteTopicsRequest.
When the controller receives a DeleteTopicsRequest, if the IBP is >= MIN_TOPIC_ID_VERSION it will delete the /brokers/topics/[topic] znode payload and immediately reply to the DeleteTopicsRequest with a successful response. At this point, the topic is considered deleted, and a topic with the same name can be created.
Although the topic is safely deleted at this point, it must still be garbage collected. To garbage collect, the controller will then send StopReplicaRequest(s) to all brokers assigned as replicas for the deleted topic. For the most part, deletion logic can be maintained between IBP versions, with some differences in responses and cleanup in ZooKeeper. Both formats must still be supported, as the IBP may not be bumped right away and deletes may have already been staged before the IBP bump occurs.
The updated controller's delete logic will:
- Collect deleted topics:
- Old format: /admin/delete_topics pulling the topic state from /brokers/topics/[topic].
- New in-memory topic deletion states from received DeleteTopicsRequest(s)
- Remove deleted topics from replicas by sending StopReplicaRequest V3 before the IBP bump using the old logic, and using V4 and the new logic with topic IDs after the IBP bump.
- Finalize successful deletes:
- For /admin/delete_topics deletes, we may need to respond to the TopicDeleteRequest. We can also delete the topic znode at /admin/delete_topics/[topic] and /brokers/topics/[topic].
- For deletes for topics with topic IDs, remove the topic from the in memory topic deletion state on the controller.
- Any unsuccessful StopReplicaRequest(s) will be retried after retryMs, starting from 1) and will be maintained in memory.
This leads to the question of what should be done if the controller never receives a successful response from a replica for a StopReplicaRequest. Under such a scenario it is still safe to stop retrying after a reasonable number of retries and time. Given that LeaderAndIsrRequest v5 includes a type flag, allowing for FULL requests to be identified, any stale partitions will be reconciled and deleted by a broker on startup upon receiving the initial LeaderAndIsrRequest from the a controller. This condition is also safe if the controller changes before the StopReplicaRequest(s) succeed, as the new controller will send a FULL LeaderAndIsrRequest on becoming the leader, ensuring that any stale partitions are cleaned up.
Immediate delete scenarios
Stale reads
- Broker B1 is a leader for topic partition A_p0_id0
- Topic A id0 is deleted.
- Topic A id1 is created.
- Broker B1 has not yet received a new LeaderAndIsrRequest, nor a StopReplicaRequest for topic partition A_p0_id0
- Broker B2 has received a LeaderAndIsrRequest for topic partition A_p0 _id0, and starts fetching from B1.
Inclusion of topic IDs in FetchRequest/ListOffsetRequest/OffsetsForLeaderEpochRequest(s) ensure that this scenario is safe. By adding the topic ID to these request types, any request to stale partitions will not be successful.
Stale state
- Broker B1 is a replica for A_p0_id0.
- Topic A id0 is deleted.
- B1 and has not does not receive a StopReplicaRequest for A_p0_id0.
- Topic A id1 is created.
- Broker B1 receives a LeaderAndIsrRequest containing partition A_p0_id1.
When this occurs, we will close the Log for A_p0_id0, and move A_p0_id0 to the deleting directory as described in the LeaderAndIsrRequest description above.
Storage
Partition Metadata file
To allow brokers to resolve the topic name under this structure, a metadata file will be created at logdir/partitiondir/partition.metadata.
This metadata file will be human readable, and will include:
- Metadata schema version (schema_version: int32)
- Topic ID (id: UUID)
This file will be plain text (key/value pairs).
version: 0 topic_id: 46bdb63f9e8d4a38bf7bee4eb2a794e4 |
---|
One important use for this file is the current directory structure does not allow us to reload the broker's view of topic ID on startup (perhaps after a failure). It is necessary to persist this file to disk so this information can be reloaded.
It will be easy to update the file to include more fields in the future. This may assist with tooling purposes like mapping topic IDs to topic names.
In the JBOD mode, a partition's data can be moved from one disk to another. The partition metadata file would be copied during this process.
Tooling
kafka-topics.sh --describe will be updated to include the topic ID in the output. A user can specify a topic name to describe with the --topic parameter, or alternatively the user can supply a topic ID with the --topic_id parameter
Configuration
The following configuration options will be added:
Option | Unit | Default | Description |
---|---|---|---|
delete.stale.topic.delay.ms | ms | 14400 (4 hours) | When a FULL or INCREMENTAL LeaderAndIsrRequest is received and the request does not contain a partition that exists on a broker or a broker's topic ID does not match the ID in the request, a deletion event will be staged for that partition which will complete after delete.stale.topic.delay.ms milliseconds. |
AdminClient Support
Access to topic IDs from the AdminClient will make it easier for users to obtain topics' topic IDs. It can also ensure correctness when deleting topics. This will require some changes to public APIs and protocols
CreateTopics
Upon creation of a topic, the topic ID will be included in the TopicMetadataAndConfig which is included in CreateTopicsResult. It can be accessed through a method in CreateTopicsResult or the TopicMetadataAndConfig object.
CreateTopicsResult
public class CreateTopicsResult {
public KafkaFuture<UUID> topicId(String topic)
...
public static class TopicMetadataAndConfig {
TopicMetadataAndConfig(UUID topicId, int numPartitions, int replicationFactor, Config config)
public UUID topicId()
}
The protocol for CreateTopicsResponse will also need a slight modification.
CreateTopicsResponse v7
|
Describe Topics
There are two use cases we want to support. 1) Obtaining topic IDs when asking to describe topics and 2) supplying topic IDs to get a description of the topics
For the first use case, we need to modify TopicDescription and MetadataResponse
TopicDescription
/**
* Create an instance with the specified parameters.
*
* @param name The topic name
* @param internal Whether the topic is internal to Kafka
* @param partitions A list of partitions where the index represents the partition id and the element contains
* leadership and replica information for that partition.
* @param authorizedOperations authorized operations for this topic, or null if this is not known.
* @param topicId Unique value that identifies the topic
*
*/
public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions,
Set<AclOperation> authorizedOperations, UUID topicId)
/**
* A unique identifier for the topic.
*/
public UUID topicId()
MetadataResponse v10
|
Even when topic IDs are supported, the response will contain both the topic name and the topic ID.
Additionally, new methods will need to be added to the Admin interface and KafkaAdminClient
Admin and KafkaAdminClient
default DescribeTopicsResult describeTopics(Collection<UUID> topicIds)
DescribeTopicsResult describeTopics(Collection<UUID> topicIds, DescribeTopicsOptions options)
MetadataRequest must also be modified. Topic name will be left in to allow requests to be made either by topic name or topic ID.
ID will be checked first, but if the value is the default zero UUID, topic name will be used instead. If an ID is specified and the ID does not exist, the request will fail regardless of allow_auto_topic_creation.
If name and ID are included, but the name does not match the ID, the request will also fail.
MetadataRequest v10
|
DeleteTopics
It will be useful for the AdminClient to be able to specify a list of topic Ids to delete to ensure the correct topics are being deleted. New methods will need to be added to the Admin interface and KafkaAdminClient
Admin and KafkaAdminClient
default DeleteTopicsResult deleteTopics(Collection<UUID> topics)
DeleteTopicsResult deleteTopics(Collection<UUID> topics, DeleteTopicsOptions options);
DeleteTopics Request and Response should be modified.
DeleteTopicsRequest v6
|
DeleteTopicsResponse v6
|
Even in the cases where only topic ID or only topic name are included in the request, if topic Ids are supported, the response will contain both the name and the ID
DeleteTopics
With the addition of topic IDs and the changes to LeaderAndIsrRequest described above, we can now make changes to topic deletion logic that will allow topics to be immediately considered deleted, regardless of whether all replicas have responded to a DeleteTopicsRequest.
When the controller receives a DeleteTopicsRequest, if the IBP is >= MIN_TOPIC_ID_VERSION it will delete the /brokers/topics/[topic] znode payload and immediately reply to the DeleteTopicsRequest with a successful response. At this point, the topic is considered deleted, and a topic with the same name can be created.
Although the topic is safely deleted at this point, it must still be garbage collected. To garbage collect, the controller will then send StopReplicaRequest(s) to all brokers assigned as replicas for the deleted topic. For the most part, deletion logic can be maintained between IBP versions, with some differences in responses and cleanup in ZooKeeper. Both formats must still be supported, as the IBP may not be bumped right away and deletes may have already been staged before the IBP bump occurs.
The updated controller's delete logic will:
- Collect deleted topics:
- Old format: /admin/delete_topics pulling the topic state from /brokers/topics/[topic].
- New in-memory topic deletion states from received DeleteTopicsRequest(s)
- Remove deleted topics from replicas by sending StopReplicaRequest V3 before the IBP bump using the old logic, and using V4 and the new logic with topic IDs after the IBP bump.
- Finalize successful deletes:
- For /admin/delete_topics deletes, we may need to respond to the TopicDeleteRequest. We can also delete the topic znode at /admin/delete_topics/[topic] and /brokers/topics/[topic].
- For deletes for topics with topic IDs, remove the topic from the in memory topic deletion state on the controller.
- Any unsuccessful StopReplicaRequest(s) will be retried after retryMs, starting from 1) and will be maintained in memory.
This leads to the question of what should be done if the controller never receives a successful response from a replica for a StopReplicaRequest. Under such a scenario it is still safe to stop retrying after a reasonable number of retries and time. Given that LeaderAndIsrRequest v5 includes a type flag, allowing for FULL requests to be identified, any stale partitions will be reconciled and deleted by a broker on startup upon receiving the initial LeaderAndIsrRequest from the a controller. This condition is also safe if the controller changes before the StopReplicaRequest(s) succeed, as the new controller will send a FULL LeaderAndIsrRequest on becoming the leader, ensuring that any stale partitions are cleaned up.
Immediate delete scenarios
Stale reads
- Broker B1 is a leader for topic partition A_p0_id0
- Topic A id0 is deleted.
- Topic A id1 is created.
- Broker B1 has not yet received a new LeaderAndIsrRequest, nor a StopReplicaRequest for topic partition A_p0_id0
- Broker B2 has received a LeaderAndIsrRequest for topic partition A_p0 _id0, and starts fetching from B1.
Inclusion of topic IDs in FetchRequest/ListOffsetRequest/OffsetsForLeaderEpochRequest(s) ensure that this scenario is safe. By adding the topic ID to these request types, any request to stale partitions will not be successful.
Stale state
- Broker B1 is a replica for A_p0_id0.
- Topic A id0 is deleted.
- B1 and has not does not receive a StopReplicaRequest for A_p0_id0.
- Topic A id1 is created.
- Broker B1 receives a LeaderAndIsrRequest containing partition A_p0_id1.
When this occurs, we will close the Log for A_p0_id0, and move A_p0_id0 to the deleting directory as described in the LeaderAndIsrRequest description above.
Storage
Partition Metadata file
To allow brokers to resolve the topic name under this structure, a metadata file will be created at logdir/partitiondir/partition.metadata.
This metadata file will be human readable, and will include:
- Metadata schema version (schema_version: int32)
- Topic ID (id: UUID)
This file will be plain text (key/value pairs).
...
version: 0
topic_id: 46bdb63f9e8d4a38bf7bee4eb2a794e4
One important use for this file is the current directory structure does not allow us to reload the broker's view of topic ID on startup (perhaps after a failure). It is necessary to persist this file to disk so this information can be reloaded.
It will be easy to update the file to include more fields in the future. This may assist with tooling purposes like mapping topic IDs to topic names.
In the JBOD mode, a partition's data can be moved from one disk to another. The partition metadata file would be copied during this process.
Tooling
kafka-topics.sh --describe will be updated to include the topic ID in the output. A user can specify a topic name to describe with the --topic parameter, or alternatively the user can supply a topic ID with the --topic_id parameter
Configuration
The following configuration options will be added:
...
.
Compatibility with KIP-500
...
- CreatePartitionsRequest
- ElectPreferredLeadersRequest
- AlterReplicaLogDirsRequest
- AlterConfigsRequestDeleteTopicsRequest
- DescribeConfigsRequest
- DescribeLogDirsRequest
- DeleteRecordsRequest
- AddPartitionsToTxnRequest
- TxnOffsetCommitRequest
- WriteTxnMarkerRequest
...