Overall, topic IDs provide a safer way for brokers to replicate topics without any chance of incorrectly interacting with stale topics with the same name. By preventing such scenarios, we can simplify a number of other interactions such as topic deletes which are currently more complicated and problematic than necessary.

Public Interfaces


Minor changes to the TopicDescription interface will be made to allow clients to access the topic ID of topics found in metadata responses.

* Create an instance with the specified parameters.
* @param name The topic name
* @param internal Whether the topic is internal to Kafka
* @param partitions A list of partitions where the index represents the partition id and the element contains
*                   leadership and replica information for that partition.
* @param authorizedOperations authorized operations for this topic, or null if this is not known.
* @param topicId Unique value that identifies the topic
public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions,
    Set<AclOperation> authorizedOperations, UUID topicId)

* A unique identifier for the topic.
public UUID topicId()


A new UUID class will be exposed under /org/apache/kafka/common


 * This class defines an immutable universally unique identifier (UUID). It represents a 128-bit value.
 * More specifically, the random UUIDs in this class are variant 2 (Leach-Salz) version 4 UUIDs.
 * This definition is very similar to java.util.UUID. One notable difference is that the toString() method prints
 * using the base64 string encoding.
public class UUID {

OffsetForLeaderEpoch Response (Version: 4) => throttle_time_ms [topics]
  throttle_time_ms => INT32
  topics => topic_id* [partitions]
    topic_id* => UUID 
    partitions => error_code partition leader_epoch end_offset
      error_code => INT16
      partition => INT32
      leader_epoch => INT32
      end_offset => INT64


MetadataResponse v10


UpdateMetadata Request (Version: 7) => controller_id controller_epoch broker_epoch [ungrouped_partition_states] [topic_states] [live_brokers] 
Metadata Response (Version: 10) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations
    throttle_time_ms => INT32
brokers controller_epoch => node_id host port rackINT32
    nodebroker_id epoch => INT32INT64
    host ungrouped_partition_states => STRINGUpdateMetadataPartitionState
        port => INT32topic_states => topic_name topic_id* [partition_states]
rack topic_name => STRING
clustertopic_id* => STRINGUUID
controllerpartition_id states => INT32> UpdateMetadataPartitionState
topics live_brokers => error_code name topic_id* is_internal [partitions] topic_authorized_operationsid v0_host v0_port [endpoints] rack
error_code id => INT16INT32
name v0_host => STRING
  topic_id*   v0_port => UUIDINT32
is_internal => BOOL
endpoints => port host listener security_protocol
partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas]port => INT32
error_code => INT16host => STRING
partition_index listener => INT32STRING
leadersecurity_id protocol => INT32INT16
    leader_epoch => INT32
            replica_nodes => INT32
            isr_nodes => INT32
            offline_replicas rack => STRING


Produce Request (Version 9) => transactional_id acks timeout_ms [topics]
    transactional_id => STRING
    acks => INT16
    timeout_ms => INT32

    topic_authorized_operations => INT32
    cluster_authorized_operations => INT32


topics => topic_id* [partitions]
        topic_id* => UUID
        partitions => partition_index records
            partition_index => INT32
            records => BYTES

ProduceResponse v9

Produce Response (Version 9) => [responses] throttle_time_ms
    responses => topic_id* [partitions

Produce Request (Version 9) => transactional_id acks timeout_ms [topics]
    transactional_id => STRING
    acks => INT16
    timeout_ms => INT32
    topics => topic_id* [partitions]
        topic_id* => UUID
        partitions => partition_index records
            partition_index => INT32
            records => BYTES

Produce Response (Version 9) => [responses] throttle_time_ms
    responses => topic_id* [partitions]
        topic_id* => UUID
        partitions => partition_index error_code base_offset log_append_time_ms log_start_offset [record_errors] error_message
            partition_index => INT32
            error_code => INT16
            base_offset => INT64
            log_append_time_ms => INT64
            log_start_offset => INT64
            record_errors => batch_index batch_index_error_message
                batch_index => INT32
                batch_index_error_message => STRING
            error_message => STRING
    throttle_time_ms =>  INT32  

The following configuration options will be added:

OptionUnitDefaultDescription
stale.partition.deletion.delay.ms14400 (4 hours)When a FULL or INCREMENTAL LeaderAndIsrRequest is received and the request does not contain a partition that exists on a broker or a broker's topic ID does not match the ID in the request, a deletion event will be staged for that partition which will complete after stale.partition.deletion.delay.ms milliseconds.

AdminClient Support

Access to topic IDs from the AdminClient will make it easier for users to obtain topics' topic IDs. It can also ensure correctness when deleting topics. This will require some changes to public APIs and protocols


Upon creation of a topic, the topic ID will be included in the TopicMetadataAndConfig which is included in CreateTopicsResult. It can be accessed through a method in CreateTopicsResult or the TopicMetadataAndConfig object.


public class CreateTopicsResult {

  public KafkaFuture<UUID> topicId(String topic)


  public static class TopicMetadataAndConfig {

    TopicMetadataAndConfig(UUID topicId, int numPartitions, int replicationFactor, Config config)

    public UUID topicId()


The protocol for CreateTopicsResponse will also need a slight modification.

CreateTopicsResponse v7

CreateTopics Response (Version: 7) => throttle_time_ms [topics]
  throttle_time_ms => INT32
  topics => name topic_id* error_code error_message topic_config_error_code num_partitions replication_factor [configs]
         name => STRING
    topic_id* => UUID
    error_code => INT16
    error_message => STRING
    topic_config_error_code => INT16
    num_partitions => INT32
    replication_factor => INT16 

    configs => name value read_only config_source is_sensitive
      name => STRING
      value => STRING
      read_only => BOOL
      config_source => INT8
      is_sensitive => BOOL

There are two use cases we want to support. 1) Obtaining topic IDs when asking to describe topics and 2) supplying topic IDs to get a description of the topics

For the first use case, we need to modify TopicDescription and MetadataResponse


* Create an instance with the specified parameters.
* @param name The topic name
* @param internal Whether the topic is internal to Kafka
* @param partitions A list of partitions where the index represents the partition id and the element contains
*                   leadership and replica information for that partition.
* @param authorizedOperations authorized operations for this topic, or null if this is not known.
* @param topicId Unique value that identifies the topic
public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions,
    Set<AclOperation> authorizedOperations, UUID topicId)

* A unique identifier for the topic.
public UUID topicId()

Metadata Response (Version: 10) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations
    throttle_time_ms => INT32
    brokers => node_id host port rack
        node_id => INT32
        host => STRING
        port => INT32
        rack => STRING
    cluster_id => STRING
    controller_id => INT32
    topics => error_code name topic_id* is_internal [partitions] topic_authorized_operations
        error_code => INT16
        name => STRING
        topic_id* => UUID
        is_internal => BOOL
        partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas]
            error_code => INT16
            partition_index => INT32
            leader_id => INT32
            leader_epoch => INT32
            replica_nodes => INT32
            isr_nodes => INT32
            offline_replicas => INT32
        topic_authorized_operations => INT32
    cluster_authorized_operations => INT32

Even when topic IDs are supported, the response will contain both the topic name and the topic ID.

 Additionally, new methods will need to be added to the Admin interface and KafkaAdminClient

Admin and KafkaAdminClient

default DescribeTopicsResult describeTopics(Collection<UUID> topicIds)

DescribeTopicsResult describeTopics(Collection<UUID> topicIds, DescribeTopicsOptions options)

MetadataRequest must also be modified. Topic name will be left in to allow requests to be made either by topic name or topic ID.

ID will be checked first, but if the value is the default zero UUID, topic name will be used instead. If an ID is specified and the ID does not exist, the request will fail regardless of allow_auto_topic_creation.
If name and ID are included, but the name does not match the ID, the request will also fail.

MetadataRequest v10

Metadata Request (Version: 10) => [topics] allow_auto_topic_creation include_cluster_authorized_operations include_topic_authorized_operations
    topics => name topic_id*
      name => STRING
      topic_id* => UUID
    allow_auto_topic_creation => BOOL
    include_cluster_authorized_operations => BOOL
    include_topic_authorized_operations => BOOL


It will be useful for the AdminClient to be able to specify a list of topic Ids to delete to ensure the correct topics are being deleted. New methods will need to be added to the Admin interface and KafkaAdminClient

Admin and KafkaAdminClient

default DeleteTopicsResult deleteTopics(Collection<UUID> topics)

DeleteTopicsResult deleteTopics(Collection<UUID> topics, DeleteTopicsOptions options);

DeleteTopics Request and Response should be modified.

DeleteTopicsRequest v6

Metadata Request (Version: 6) => [topic_names] timeout_ms
    topic_names => STRING
    timeout_ms => INT32

DeleteTopicsResponse v6

Metadata Request (Version: 6) => throttle_time_ms [responses]
    throttle_time_ms => INT32
    responses => name topic_id* error_code error_message
      name => STRING
      topic_id* => UUID
      error_code => INT16
      error_message => STRING

Even in the cases where only topic ID or only topic name are included in the request, if topic Ids are supported, the response will contain both the name and the ID


The following configuration options will be added:



Compatibility with KIP-500


  • CreatePartitionsRequest
  • ElectPreferredLeadersRequest
  • AlterReplicaLogDirsRequest
  • AlterConfigsRequestDeleteTopicsRequest
  • DescribeConfigsRequest
  • DescribeLogDirsRequest
  • DeleteRecordsRequest
  • AddPartitionsToTxnRequest
  • TxnOffsetCommitRequest
  • WriteTxnMarkerRequest
