Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion thread: https://lists.apache.org/thread.html/7efa8cd169cadc7dc9cf86a7c0dbbab1836ddb5024d310fcebacf80c@%3Cdev.kafka.apache.org%3E

...

Overall, topic IDs provide a safer way for brokers to replicate topics without any chance of incorrectly interacting with stale topics with the same name. By preventing such scenarios, we can simplify a number of other interactions such as topic deletes which are currently more complicated and problematic than necessary.

Public Interfaces

...


Uuid

A new UUID Uuid class will be exposed under /org/apache/kafka/common

/*
 * This class defines an immutable universally unique identifier (UUIDUuid). It represents a 128-bit value.
 * More specifically, the random UUIDsUuids in this class are variant 2 (Leach-Salz) version 4 UUIDsUuids.
 * This definition is very similar to java.util.UUID. One notable difference is that the The toString() method prints

* using the base64 string encoding. Likewise, the fromString method expects a base64 string encoding. */ public class UUIDUuid { /** * A UUIDUuid where all bits are zero. It represents a null or empty UUIDUuid. */ public static final UUIDUuid ZERO_UUID /** * Constructs a 128-bit type 4 UUIDUuid where the first long represents the the most significant 64 bits * and the second long represents the least significant 64 bits. */ public UUIDUuid(long mostSigBits, long leastSigBits) /** * Static factory to retrieve a type 4 (pseudo randomly generated) UUIDUuid. */ public static UUIDUuid randomUUIDrandomUuid()
/** * Returns the most significant bits of the UUIDUuid's 128 bit value. */ public long getMostSignificantBits() /** * Returns the least significant bits of the UUIDUuid's 128 bit value. */ public long getLeastSignificantBits() /** * Returns true iff the obj is another UUIDUuid with the same value. */ public boolean equals(Object obj) /** * Returns a hash code for this UUIDUuid */ public int hashCode() /** * Returns a base64 string encoding of the UUIDUuid. */ public String toString()

/** * Creates a UUIDUuid based on a base64 string encoding used in the toString() method. */ public static UUIDUuid fromString(String str)
}

Additionally, it may be dangerous to use older versions of Kafka tools with new broker versions when using their --zookeeper flags. Use of older tools in this way is not supported today.

Proposed Changes

Topic IDs will be represented with 128 bit v4 UUIDs. A UUID with all bits as 0 will be reserved as a null UUID as the Kafka RPC protocol does not allow for nullable fields. When printed or stored as a string, topic IDs will be converted to base64 string representation.

On handling a CreateTopicRequest brokers will create the topic znode under /brokers/topics/[topic], as usual.

The znode value will now contain an additional topic ID field, represented as a base64 string in the "id" field, and the schema version will be bumped to version 3.


Some public api will be added to org.apache.kafka.common.Cluster

/**
* An immutable representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
*/
public class Cluster { /** * Get All topicIds in the cluster, similar to topics() */ public Collection<Uuid> topicIds() /** * Get the topicId of a topic, Uuid.ZERO_UUID is returned topicId doesn't exists. */ public Uuid topicId(String topic)

}


Additionally, it may be dangerous to use older versions of Kafka tools with new broker versions when using their --zookeeper flags. Use of older tools in this way is not supported today.

Proposed Changes

Topic IDs will be represented with 128 bit v4 UUIDs. A UUID with all bits as 0 will be reserved as a null UUID as the Kafka RPC protocol does not allow for nullable fields. When printed or stored as a string, topic IDs will be converted to base64 string representation.

On handling a CreateTopicRequest brokers will create the topic znode under /brokers/topics/[topic], as usual.

The znode value will now contain an additional topic ID field, represented as a base64 string in the "id" field, and the schema version will be bumped to version 3.

Schema:
{ "fields" :
    [ {"name": "version", "type": "int", "Schema:
{ "fields" :
    [ {"name": "version", "type": "int", "doc": "version id},
      {"name": "id", "type": "string", "doc": option[UUIDUuid]},
      {"name": "partitions",
       "type": {"type": "map",
                "values": {"type": "array", "items": "int", "doc": "a list of replica ids"},
                "doc": "a map from partition id to replica list"},

             {"name": "adding_replicas",
       "type": {"type": "map",
                "values": {"type": "array", "items": "int", "doc": "a list of replica ids"},
                "doc": "a map from partition id to a list of replicas to add in a pending reassignment"},

             {"name": "removing_replicas",
       "type": {"type": "map",
                "values": {"type": "array", "items": "int", "doc": "a list of replica ids"},
                "doc": "a map from partition id to a list of replicas to remove in a pending reassignment"},
      }
    ]
}
 
Example:
{
  "version": 3,
  "id": "b8tRS7h4TJ2Vt43Dp85v2A",
  "partitions": {"0": [0, 1, 3] },
    "adding_replicas": {},
    "removing_replicas": {}
}

...

Reconciliation may also be necessary if type = INCREMENTAL and the topic ID set on a local partition does not match the topic ID contained in the request. A TopicPartition with the same name and a different topic ID by implies that the local topic partition is stale, as the topic must have been deleted to create a new topic with a different topic ID. This is similar to the type 2 stale request above, and the topic will be staged for deletion.

Deletion

In the case where the topic ID in the request does not match the topic ID in the log (in either FULL or INCREMENTAL requests), we will also return a new exception INCONSISTENT_TOPIC_ID.
This exception will be used for when the topic ID in memory does not match the topic ID in the request.

Deletion

Deletion of stale partitions triggered by LeaderAndIsrRequest(s) will Deletion of stale partitions triggered by LeaderAndIsrRequest(s) will take place by:

  1. Logging at WARN level all partitions that will be deleted and the time that they will be be deleted at.
  2. Move the partition's directory to log.dir/deleting/{topic_id}_{partition}
  3. Schedule deletion from disk with a delay of delete.stale.topic.delay.ms ms. This will clear the deleting directory of the partition's contents.

...

StopReplica Response (Version: 4) => error_code [topics]
  error_code => INT16
    topics => topic topic_id* [partitions]
    topic_id* => UUID 

    partitions => partition error_code
      partition => INT32
      error_code => INT16

...

  • Metadata schema version (schema_version: int32)
  • Topic ID (id: UUIDUuid)

This file will be plain text (key/value pairs).

...

OptionUnitDefaultDescription
delete.stale.topic.delay.ms ms14400 (4 hours)When a FULL or INCREMENTAL LeaderAndIsrRequest is received and the request does not contain a partition that exists on a broker or a broker's topic ID does not match the ID in the request, a deletion event will be staged for that partition which will complete after delete.stale.topic.delay.ms milliseconds.

AdminClient Support

The minimum amount of time to wait before removing a deleted topic's data on every broker

AdminClient Support

Access to topic IDs from the AdminClient will Access to topic IDs from the AdminClient will make it easier for users to obtain topics' topic IDs. It can also ensure correctness when deleting topics. This will require some changes to public APIs and protocols

CreateTopics

Upon creation of a topic, the topic ID will be included in the TopicMetadataAndConfig which is included in CreateTopicsResult. It can be accessed through a method in CreateTopicsResult or the TopicMetadataAndConfig object.

CreateTopicsResult

public class CreateTopicsResult {

  public KafkaFuture<UUID> topicId(String topic)

...

  public static class TopicMetadataAndConfig {

    TopicMetadataAndConfig(UUID topicId, int numPartitions, int replicationFactor, Config config)

    public UUID topicId()

}

The protocol for CreateTopicsResponse will also need a slight modification.

CreateTopicsResponse v7

...

CreateTopics Response (Version: 7) => throttle_time_ms [topics]
  throttle_time_ms => INT32
  topics => name topic_id* error_code error_message topic_config_error_code num_partitions replication_factor [configs]
         name => STRING
    topic_id* => UUID
    error_code => INT16
    error_message => STRING
    topic_config_error_code => INT16
    num_partitions => INT32
    replication_factor => INT16 

    configs => name value read_only config_source is_sensitive
      name => STRING
      value => STRING
      read_only => BOOL
      config_source => INT8
      is_sensitive => BOOL

Describe Topics

There are two use cases we want to support. 1) Obtaining topic IDs when asking to describe topics and 2) supplying topic IDs to get a description of the topics

For use case (1), we need to modify TopicDescription and MetadataResponse

TopicDescription

/**
* Create an instance with the specified parameters.
*
* @param name The topic name
* @param internal Whether the topic is internal to Kafka
* @param partitions A list of partitions where the index represents the partition id and the element contains
*                   leadership and replica information for that partition.
* @param authorizedOperations authorized operations for this topic, or null if this is not known.
* @param topicId Unique value that identifies the topic
*
*/
public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions,
    Set<AclOperation> authorizedOperations, UUID topicId)

/**
* A unique identifier for the topic.
*/
public UUID topicId()

MetadataResponse v10

...

Metadata Response (Version: 10) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations
    throttle_time_ms => INT32
    brokers => node_id host port rack
        node_id => INT32
        host => STRING
        port => INT32
        rack => STRING
    cluster_id => STRING
    controller_id => INT32
    topics => error_code name topic_id* is_internal [partitions] topic_authorized_operations
        error_code => INT16
        name => STRING
        topic_id* => UUID
        is_internal => BOOL
        partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas]
            error_code => INT16
            partition_index => INT32
            leader_id => INT32
            leader_epoch => INT32
            replica_nodes => INT32
            isr_nodes => INT32
            offline_replicas => INT32
        topic_authorized_operations => INT32
    cluster_authorized_operations => INT32

TopicCollection

One change to help with the transition from defining topics by names to defining them by IDs is a new class that can represent a collection of topics by name or ID. This class can be passed in to methods that support identifying topics by either identifier–like describe and delete below. This will be found in the common package.

/**
* A class used to represent a collection of topics. This collection may define topics by topic name
* or topic ID. Subclassing this class beyond the classes provided here is not supported.
*/
public abstract class TopicCollection {

private TopicCollection() {}

/**
* @return a collection of topics defined by topic ID
*/
public static TopicIdCollection ofTopicIds(Collection<Uuid> topics);

/**
* @return a collection of topics defined by topic name
*/
public static TopicNameCollection ofTopicNames(Collection<String> topics);

/**
* A class used to represent a collection of topics defined by their topic ID.
* Subclassing this class beyond the classes provided here is not supported.
*/
public static class TopicIdCollection extends TopicCollection {

/**
* @return A collection of topic IDs
*/
public Collection<Uuid> topicIds();
}

/**
* A class used to represent a collection of topics defined by their topic name.
* Subclassing this class beyond the classes provided here is not supported.
*/
public static class TopicNameCollection extends TopicCollection {

/**
* @return A collection of topic names
*/
public Collection<String> topicNames();
}
}

CreateTopics

Upon creation of a topic, the topic ID will be included in the TopicMetadataAndConfig which is included in CreateTopicsResult. It can be accessed through a method in CreateTopicsResult or the TopicMetadataAndConfig object.

CreateTopicsResult

public class CreateTopicsResult {

  public KafkaFuture<Uuid> topicId(String topic)

...

  public static class TopicMetadataAndConfig {

    TopicMetadataAndConfig(Uuid topicId, int numPartitions, int replicationFactor, Config config)

    public Uuid topicId()

}

The protocol for CreateTopicsResponse will also need a slight modification.

CreateTopicsResponse v7

CreateTopics Response (Version: 7) => throttle_time_ms [topics]
  throttle_time_ms => INT32
  topics => name topic_id* error_code error_message topic_config_error_code num_partitions replication_factor [configs]
         name => STRING
    topic_id* => UUID
    error_code => INT16
    error_message => STRING
    topic_config_error_code => INT16
    num_partitions => INT32
    replication_factor => INT16 

    configs => name value read_only config_source is_sensitive
      name => STRING
      value => STRING
      read_only => BOOL
      config_source => INT8
      is_sensitive => BOOL

Describe Topics

There are two use cases we want to support. 1) Obtaining topic IDs when asking to describe topics and 2) supplying topic IDs to get a description of the topics

For use case (1), we need to modify TopicDescription and MetadataResponse

TopicDescription

/**
* Create an instance with the specified parameters.
*
* @param name The topic name
* @param internal Whether the topic is internal to Kafka
* @param partitions A list of partitions where the index represents the partition id and the element contains
*                   leadership and replica information for that partition.
* @param authorizedOperations authorized operations for this topic, or null if this is not known.
* @param topicId Unique value that identifies the topic
*
*/
public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions,
    Set<AclOperation> authorizedOperations, Uuid topicId)


/**
* A unique identifier for the topic.
*/
public Uuid topicId()

MetadataResponse v10

Metadata Response (Version: 10) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations
    throttle_time_ms => INT32
    brokers => node_id host port rack
        node_id => INT32
        host => STRING
        port => INT32
        rack => STRING
    cluster_id => STRING
    controller_id => INT32
    topics => error_code name topic_id* is_internal [partitions] topic_authorized_operations
        error_code => INT16
        name => STRING 
        topic_id* => UUID
        is_internal => BOOL
        partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas]
            error_code => INT16
            partition_index => INT32
            leader_id => INT32
            leader_epoch => INT32
            replica_nodes => INT32
            isr_nodes => INT32
            offline_replicas => INT32
        topic_authorized_operations => INT32
    cluster_authorized_operations => INT32

When topic IDs are supported, the response will contain both the topic name and the topic ID.

For use case (2), new methods will need to be added to the Admin interface and KafkaAdminClient

Admin and KafkaAdminClient

default DescribeTopicsResult describeTopics(TopicCollection topics); 
DescribeTopicsResult describeTopics(TopicCollection topics, DescribeTopicsOptions options);

We also plan to deprecate the old methods in a future release. There are changes to DescribeTopicsResult and deprecation of some of its methods

public class DescribeTopicsResult {

protected DescribeTopicsResult(Map<Uuid, KafkaFuture<Void>> topicIdFutures, Map<String, KafkaFuture<Void>> nameFutures);

protected static DescribeTopicsResult ofTopicIds(Map<Uuid, KafkaFuture<Void>> topicIdFutures);

protected static DescribeTopicsResult ofTopicNames(Map<String, KafkaFuture<Void>> nameFutures);

/**
* @return a map from topic IDs to futures which can be used to check the status of
* individual topics if the describeTopics request used topic IDs. Otherwise return null.
*/
public Map<Uuid, KafkaFuture<Void>> topicIdValues()

/**
* @return a map from topic names to futures which can be used to check the status of
* individual topics if the describeTopics request used topic names. Otherwise return null.
*/
public Map<String, KafkaFuture<Void>> topicNameValues()

@Deprecated
/**
* @return a map from topic names to futures which can be used to check the status of
* individual topics if the describeTopics request used topic names. Otherwise return null.
*/
public Map<String, KafkaFuture<Void>> values()

/**
* @return a future which succeeds only if all the topic descriptions succeed and the describeTopics
* request used topic IDs.
*/

public KafkaFuture<Map<Uuid, TopicDescription>> allTopicIds()

/**
* @return a future which succeeds only if all the topic descriptions succeed and the describeTopics
* request used topic names.
*/

public KafkaFuture<Map<String, TopicDescription>> allTopicNames()

@Deprecated
/**
* Return a future which succeeds only if all the topic descriptions succeed and the describeTopics
* request used topic names.
*/

public KafkaFuture<Void> all()
}


MetadataRequest must also be modified. Topic name will be left in to allow requests to be made either by topic name or topic ID. Requests should only use one or the other.

ID will be checked first, but if the value is the default zero UUID, topic name will be used instead.  If an ID is specified and the ID does not exist, the request will fail regardless of allow_auto_topic_creation. 
If the topic ID is not found, the request will return an UNKNOWN_TOPIC_ID error for the topic indicating the topic ID did not exist. The check for the topic ID will occur before checking authorization on the topic. Thus, topic IDs are not considered sensitive information.

MetadataRequest v10

Metadata Request (Version: 10) => [topics] allow_auto_topic_creation include_cluster_authorized_operations include_topic_authorized_operations
    topics => name topic_id*
      name => STRING (nullable)*
      topic_id* => UUID
    allow_auto_topic_creation => BOOL
    include_cluster_authorized_operations => BOOL
    include_topic_authorized_operations => BOOL

DeleteTopics

It will be useful for the AdminClient to be able to specify a list of topic Ids to delete to ensure the correct topics are being deleted. New methods will need to be added to the Admin interface and KafkaAdminClient

Admin and KafkaAdminClient

default DeleteTopicsResult deleteTopics(TopicCollection topics); 
DeleteTopicsResult deleteTopics(TopicCollection topics, DeleteTopicsOptions options);


We also plan to deprecate the old methods in a future release.  There are changes to DeleteTopicResult including deprecation of some of its old methods.

public class DeleteTopicsResult {

protected DeleteTopicsResult(Map<Uuid, KafkaFuture<Void>> topicIdFutures, Map<String, KafkaFuture<Void>> nameFutures);

protected static DeleteTopicsResult ofTopicIds(Map<Uuid, KafkaFuture<Void>> topicIdFutures);

protected static DeleteTopicsResult ofTopicNames(Map<String, KafkaFuture<Void>> nameFutures);

/**
* @return a map from topic IDs to futures which can be used to check the status of
* individual deletions if the deleteTopics request used topic IDs. Otherwise return null.
*/
public Map<Uuid, KafkaFuture<Void>> topicIdValues()

/**
* @return a map from topic names to futures which can be used to check the status of
* individual deletions if the deleteTopics request used topic names. Otherwise return null.
*/
public Map<String, KafkaFuture<Void>> topicNameValues()

@Deprecated
/**
* @return a map from topic names to futures which can be used to check the status of
* individual deletions if the deleteTopics request used topic names. Otherwise return null.
*/
public Map<String, KafkaFuture<Void>> values()

/**
* @return a future which succeeds only if all the topic deletions succeed.
*/
public KafkaFuture<Void> all()
}


DeleteTopics Request and Response should be modified.

DeleteTopicsRequest v6

When topic IDs are supported, the response will contain both the topic name and the topic ID.

For use case (2), new methods will need to be added to the Admin interface and KafkaAdminClient

Admin and KafkaAdminClient

default DescribeTopicsResult describeTopics(Collection<UUID> topicIds)

DescribeTopicsResult describeTopics(Collection<UUID> topicIds, DescribeTopicsOptions options)

MetadataRequest must also be modified. Topic name will be left in to allow requests to be made either by topic name or topic ID.

ID will be checked first, but if the value is the default zero UUID, topic name will be used instead. If an ID is specified and the ID does not exist, the request will fail regardless of allow_auto_topic_creation.
If name and ID are included, but the name does not match the ID, the request will also fail.

MetadataRequest v10

Metadata Request (Version: 10) => [topics] allow_auto_topic_creation include_cluster_authorized_operations include_topic_authorized_operations

DeleteTopics Request (Version: 6) => [topics] timeout_ms


    topics => name topic_id*
      name => STRING (nullable)*
      topic_id* => UUID
   

allow_auto_topic_creation

timeout_ms =>

BOOL
    include_cluster_authorized_operations => BOOL
    include_topic_authorized_operations => BOOL

DeleteTopics

It will be useful for the AdminClient to be able to specify a list of topic Ids to delete to ensure the correct topics are being deleted. New methods will need to be added to the Admin interface and KafkaAdminClient

Admin and KafkaAdminClient

default DeleteTopicsResult deleteTopics(Collection<UUID> topics)

DeleteTopicsResult deleteTopics(Collection<UUID> topics, DeleteTopicsOptions options);

DeleteTopics Request and Response should be modified.

DeleteTopicsRequest v6

...

Metadata Request (Version: 6) => [topic_names] timeout_ms
    topic_names => STRING
    timeout_ms => INT32

INT32

Like the MetadataRequst, ID will be checked first, but if the value is the default zero UUID, topic name will be used instead. If an ID is specified and the ID does not exist, the request will return UNKNOWN_TOPIC_ID error for the topic indicating the topic ID did not exist. The check for the topic ID will occur before checking authorization on the topic. Thus, topic IDs are not considered sensitive information.

DeleteTopicsResponse v6

DeleteTopics Response

DeleteTopicsResponse v6

Metadata Request

(Version: 6) => throttle_time_ms [responses]
    throttle_time_ms => INT32
    responses => name topic_id* error_code error_message
      name => STRING (nullable)*
      topic_id* => UUID
      error_code => INT16
      error_message => STRING

Even in the cases where Although only topic ID or only topic name are included in the request, if topic Ids are supported, the response will contain both the name and the ID.

...

Solution: Use the same sentinel ID reserved for the metadata topic before its ID is known. After controller election, upon receiving the result, assign the metadata topic its unique topic ID. The ID should be written to the metadata topic, as all IDs will now be written to this topic instead of ZooKeeper.

Using a topic ID will result in a slightly smaller fetch request and likely prevent further changes. Assigning a unique ID for the metadata topic leaves the possibility for the topic to be placed in tiered storage, or used in other scenarios where topics from multiple clusters may be in one place without appending the cluster ID.

...

The idea is that this will be a hard-coded UUID that no other topic can be assigned. Initially the all zero UUID was considered, but was ultimately rejected since this is used as a null ID in some places and it is better to keep these usages separate. An example of a hard-coded UUID is 00000000-0000-0000-0000-000000000001 00000000000000000000000000000001

LeaderAndIsr, UpdateMetadata, and StopReplica 

...

It would be ideal if the log.dir layout could be restructured from {topic}_{partition} format to {{topicIdprefix}}/{topicId}_{partition}, e.g. "mytopic_1" → "24/24cc4332-f7de-45a3-b24e-33d61aa0d16c_1". Note the hierarchical directory structure using the first two characters of the topic ID to avoid having too many directories at the top level of the logdir. The exact formatting of the directory is not set in stone, but the idea is to replace topic names with topic IDs in the log directory. This is a significant change and will only be added upon a major release, most likely alongside KIP-500 changes that will also prevent downgrades.

Migration

500 changes that will also prevent downgrades.

Migration

Topic IDs will only be available to brokers with IBP version 2.8 or greater.

Upon a controller becoming active, the list of current topics is loaded from /brokers/topics/[topic]. When a topic without a topic ID is found, a UUID will be randomly generated and assigned the topic information at /brokers/topics/[topic] will be updated with the id filled and the schema version bumped to version 3.

...

Eventually the TopicPartition class should include the topic ID. This may be difficult to enact until all APIs support topic IDs, and could come with a performance impact if implemented prior to this, as TopicPartitions are used for hashmap lookups throughout the broker.

Persisting Topic IDs

A few other alternatives to the partition metadata file were considered. One topic of discussion was whether it was necessary to include at all. While the the topic name is used in the directory structure, the only way to persist the topic ID to disk is through a file. As discussed above, the directory changes will not be added until a major release.

The file can also be used for tooling purposes, and may contain mappings that will be useful in the future.

Another alternative is to have a single file mapping all topic names to ids. Although this could be useful for tooling, it would be harder to maintain this file and update on each new topic added. 

Future Work

Requests

The following requests could be improved by presence of topic IDs, but are out of scope for this KIP.

should include the topic ID. This may be difficult to enact until all APIs support topic IDs, and could come with a performance impact if implemented prior to this, as TopicPartitions are used for hashmap lookups throughout the broker.

Persisting Topic IDs

A few other alternatives to the partition metadata file were considered. One topic of discussion was whether it was necessary to include at all. While the the topic name is used in the directory structure, the only way to persist the topic ID to disk is through a file. As discussed above, the directory changes will not be added until a major release.

The file can also be used for tooling purposes, and may contain mappings that will be useful in the future.

Another alternative is to have a single file mapping all topic names to ids. Although this could be useful for tooling, it would be harder to maintain this file and update on each new topic added. 

Future Work

Requests

The following requests could be improved by presence of topic IDs, but are out of scope for this KIP.

  • CreatePartitionsRequest
  • ElectPreferredLeadersRequest
  • AlterReplicaLogDirsRequest
  • AlterConfigsRequest
  • DescribeConfigsRequest
  • DescribeLogDirsRequest
  • DeleteRecordsRequest
  • AddPartitionsToTxnRequest
  • TxnOffsetCommitRequest
  • WriteTxnMarkerRequest

AdminClient

There are further changes to AdminClient made possible by adding topic ids. By adding topic ids to various request types (like those listed above) AdminClient can support identifying topics by ID. Some examples include but are not limited to:

  • Using topic ids to specify what topics should receive new partitions in createPartitions
  • Return ids in ListTopicsResult or TopicListing for listTopics
  • Adding id to a type like TopicPartition or TopicPartitionReplica (see TopicIdPartition below)
    • Using topic ids (currently TopicPartition) to specify topic of the partitions for deleteRecords 
    • Using topic ids (currently TopicPartitionReplica) to specify topic for alterReplicaLogDirs
    • Using topic ids (currently TopicPartition) to specify topic of the partitions for electLeaders
  • CreatePartitionsRequest
  • ElectPreferredLeadersRequest
  • AlterReplicaLogDirsRequest
  • AlterConfigsRequest
  • DescribeConfigsRequest
  • DescribeLogDirsRequest
  • DeleteRecordsRequest
  • AddPartitionsToTxnRequest
  • TxnOffsetCommitRequest
  • WriteTxnMarkerRequest

Clients

Some of the implemented request types are also relevant to clients. Adding full support for topic IDs in the clients would add an additional measure of safety when producing and consuming data. Fully supporting Topic IDs in clients is out of scope for this KIP due to the numerous public APIs that will need adjustments.

...