...
Access to topic IDs from the AdminClient will make it easier for users to obtain topics' topic IDs. It can also ensure correctness when deleting topics. This will require some changes to public APIs and protocols
CreateTopics
Upon creation of a topic, the topic ID will be included in the TopicMetadataAndConfig which is included in CreateTopicsResult. It can be accessed through a method in CreateTopicsResult or the TopicMetadataAndConfig object.
CreateTopicsResult
public class CreateTopicsResult {
public KafkaFuture<Uuid> topicId(String topic)
...
public static class TopicMetadataAndConfig {
TopicMetadataAndConfig(Uuid topicId, int numPartitions, int replicationFactor, Config config)
public Uuid topicId()
}
The protocol for CreateTopicsResponse will also need a slight modification.
CreateTopicsResponse v7
...
CreateTopics Response (Version: 7) => throttle_time_ms [topics]
throttle_time_ms => INT32
topics => name topic_id* error_code error_message topic_config_error_code num_partitions replication_factor [configs]
name => STRING
topic_id* => UUID
error_code => INT16
error_message => STRING topic_config_error_code => INT16
num_partitions => INT32
replication_factor => INT16 configs
=> name value read_only config_source is_sensitive
name => STRING
value => STRING
read_only => BOOL
config_source => INT8
is_sensitive => BOOL
Describe Topics
There are two use cases we want to support. 1) Obtaining topic IDs when asking to describe topics and 2) supplying topic IDs to get a description of the topics
For use case (1), we need to modify TopicDescription and MetadataResponse
TopicDescription
/**
* Create an instance with the specified parameters.
*
* @param name The topic name
* @param internal Whether the topic is internal to Kafka
* @param partitions A list of partitions where the index represents the partition id and the element contains
* leadership and replica information for that partition.
* @param authorizedOperations authorized operations for this topic, or null if this is not known.
* @param topicId Unique value that identifies the topic
*
*/
public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions,
Set<AclOperation> authorizedOperations, Uuid topicId)
/**
* A unique identifier for the topic.
*/
public Uuid topicId()
MetadataResponse v10
...
Metadata Response (Version: 10) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations
throttle_time_ms => INT32
brokers => node_id host port rack
node_id => INT32
host => STRING
port => INT32
rack => STRING
cluster_id => STRING
controller_id => INT32
topics => error_code name topic_id* is_internal [partitions] topic_authorized_operations
error_code => INT16
name => STRING
topic_id* => UUID
is_internal => BOOL
partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas]
error_code => INT16
partition_index => INT32
leader_id => INT32
leader_epoch => INT32
replica_nodes => INT32
isr_nodes => INT32
offline_replicas => INT32
topic_authorized_operations => INT32 cluster_authorized_operations => INT32
TopicCollection
One change to help with the transition from defining topics by names to defining them by IDs is a new class that can represent a collection of topics by name or ID. This class can be passed in to methods that support identifying topics by either identifier–like describe and delete below. This will be found in the common package.
/**
* A class used to represent a collection of topics. This collection may define topics by topic name
* or topic ID. Subclassing this class beyond the classes provided here is not supported.
*/
public abstract class TopicCollection {
private TopicCollection() {}
/**
* @return a collection of topics defined by topic ID
*/
public static TopicIdCollection ofTopicIds(Collection<Uuid> topics);
/**
* @return a collection of topics defined by topic name
*/
public static TopicNameCollection ofTopicNames(Collection<String> topics);
/**
* A class used to represent a collection of topics defined by their topic ID.
* Subclassing this class beyond the classes provided here is not supported.
*/
public static class TopicIdCollection extends TopicCollection {
/**
* @return A collection of topic IDs
*/
public Collection<Uuid> topicIds();
}
/**
* A class used to represent a collection of topics defined by their topic name.
* Subclassing this class beyond the classes provided here is not supported.
*/
public static class TopicNameCollection extends TopicCollection {
/**
* @return A collection of topic names
*/
public Collection<String> topicNames();
}
}
CreateTopics
Upon creation of a topic, the topic ID will be included in the TopicMetadataAndConfig which is included in CreateTopicsResult. It can be accessed through a method in CreateTopicsResult or the TopicMetadataAndConfig object.
CreateTopicsResult
public class CreateTopicsResult {
public KafkaFuture<Uuid> topicId(String topic)
...
public static class TopicMetadataAndConfig {
TopicMetadataAndConfig(Uuid topicId, int numPartitions, int replicationFactor, Config config)
public Uuid topicId()
}
The protocol for CreateTopicsResponse will also need a slight modification.
CreateTopicsResponse v7
|
Describe Topics
There are two use cases we want to support. 1) Obtaining topic IDs when asking to describe topics and 2) supplying topic IDs to get a description of the topics
For use case (1), we need to modify TopicDescription and MetadataResponse
TopicDescription
/**
* Create an instance with the specified parameters.
*
* @param name The topic name
* @param internal Whether the topic is internal to Kafka
* @param partitions A list of partitions where the index represents the partition id and the element contains
* leadership and replica information for that partition.
* @param authorizedOperations authorized operations for this topic, or null if this is not known.
* @param topicId Unique value that identifies the topic
*
*/
public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions,
Set<AclOperation> authorizedOperations, Uuid topicId)
/**
* A unique identifier for the topic.
*/
public Uuid topicId()
MetadataResponse v10
|
When topic IDs are supported, the response will contain both the topic name and the topic ID.
For use case (2), new methods will need to be added to the Admin interface and KafkaAdminClient
Admin and KafkaAdminClient
default DescribeTopicsResult describeTopics(TopicCollection topics);
DescribeTopicsResult describeTopics(TopicCollection topics, DescribeTopicsOptions options);
We also plan to deprecate the old methods in a future release. There are changes to DescribeTopicsResult and deprecation of some of its methods
public class DescribeTopicsResult {
protected DescribeTopicsResult(Map<Uuid, KafkaFuture<Void>> topicIdFutures, Map<String, KafkaFuture<Void>> nameFutures);
protected static DescribeTopicsResult ofTopicIds(Map<Uuid, KafkaFuture<Void>> topicIdFutures);
protected static DescribeTopicsResult ofTopicNames(Map<String, KafkaFuture<Void>> nameFutures);
/**
* @return a map from topic IDs to futures which can be used to check the status of
* individual topics if the describeTopics request used topic IDs. Otherwise return null.
*/
public Map<Uuid, KafkaFuture<Void>> topicIdValues()
/**
* @return a map from topic names to futures which can be used to check the status of
* individual topics if the describeTopics request used topic names. Otherwise return null.
*/
public Map<String, KafkaFuture<Void>> topicNameValues()
@Deprecated
/**
* @return a map from topic names to futures which can be used to check the status of
* individual topics if the describeTopics request used topic names. Otherwise return null.
*/
public Map<String, KafkaFuture<Void>> values()
/**
* @return a future which succeeds only if all the topic descriptions succeed and the describeTopics
* request used topic IDs.
*/
public KafkaFuture<Map<Uuid, TopicDescription>> allTopicIds()
/**
* @return a future which succeeds only if all the topic descriptions succeed and the describeTopics
* request used topic names.
*/
public KafkaFuture<Map<String, TopicDescription>> allTopicNames()
@Deprecated
/**
* Return a future which succeeds only if all the topic descriptions succeed and the describeTopics
* request used topic names.
*/
public KafkaFuture<Void> all()
}
MetadataRequest must also be modified. Topic name will be left in to allow requests to be made either by topic name or topic ID. Requests should only use one or the other.
ID will be checked first, but if the value is the default zero UUID, topic name will be used instead. If an ID is specified and the ID does not exist, the request will fail regardless of allow_auto_topic_creation.
If the topic ID is not found, the request will return an UNKNOWN_TOPIC_ID
error for the topic indicating the topic ID did not exist. The check for the topic ID will occur before checking authorization on the topic. Thus, topic IDs are not considered sensitive information.
MetadataRequest v10
|
DeleteTopics
It will be useful for the AdminClient to be able to specify a list of topic Ids to delete to ensure the correct topics are being deleted. New
When topic IDs are supported, the response will contain both the topic name and the topic ID.
For use case (2), new methods will need to be added to the Admin interface and KafkaAdminClient
Admin
...
default DescribeTopicsWithIdsResult describeTopicsWithIds(Collection<Uuid> topicIds)
DescribeTopicsWithIdsResult describeTopicsWithIds(Collection<Uuid> topicIds, DescribeTopicsOptions options)
...
MetadataRequest must also be modified. Topic name will be left in to allow requests to be made either by topic name or topic ID. Requests should only use one or the other.
ID will be checked first, but if the value is the default zero UUID, topic name will be used instead. If an ID is specified and the ID does not exist, the request will fail regardless of allow_auto_topic_creation.
If the interbroker protocol version is less than 2.8 and does not support topic IDs, an UNSUPPORTED_VERSION
error will be returned on requests that specify topic ID. When interbroker protocol version is at least 2.8, if the topic ID is not found, the request will return an UNKNOWN_TOPIC_ID
error for the topic indicating the topic ID did not exist. The check for the topic ID will occur before checking authorization on the topic. Thus, topic IDs are not considered sensitive information.
MetadataRequest v10
...
Metadata Request (Version: 10) => [topics] allow_auto_topic_creation include_cluster_authorized_operations include_topic_authorized_operations
topics => name topic_id*
name => STRING (nullable)*
topic_id* => UUID
allow_auto_topic_creation => BOOL
include_cluster_authorized_operations => BOOL
include_topic_authorized_operations => BOOL
and KafkaAdminClient
default DeleteTopicsResult deleteTopics(TopicCollection topics);
DeleteTopicsResult deleteTopics(TopicCollection topics, DeleteTopicsOptions options);
We also plan to deprecate the old methods in a future release. There are changes to DeleteTopicResult including deprecation of some of its old methods.
public class DeleteTopicsResult {
protected DeleteTopicsResult(Map<Uuid, KafkaFuture<Void>> topicIdFutures, Map<String, KafkaFuture<Void>> nameFutures);
protected static DeleteTopicsResult ofTopicIds(Map<Uuid, KafkaFuture<Void>> topicIdFutures);
protected static DeleteTopicsResult ofTopicNames(Map<String, KafkaFuture<Void>> nameFutures);
/**
* @return a map from topic IDs to futures which can be used to check the status of
* individual deletions if the deleteTopics request used topic IDs. Otherwise return null.
*/
public Map<Uuid, KafkaFuture<Void>> topicIdValues()
/**
* @return a map from topic names to futures which can be used to check the status of
* individual deletions if the deleteTopics request used topic names. Otherwise return null.
*/
public Map<String, KafkaFuture<Void>> topicNameValues()
DeleteTopics
It will be useful for the AdminClient to be able to specify a list of topic Ids to delete to ensure the correct topics are being deleted. New methods will need to be added to the Admin interface and KafkaAdminClient
Admin and KafkaAdminClient
default DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection<Uuid> topics)
DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection<Uuid> topics, DeleteTopicsOptions options);
public class DeleteTopicsWithIdsResult {
final Map<Uuid, KafkaFuture<Void>> futures;
DeleteTopicsWithIdsResult(Map<Uuid, KafkaFuture<Void>> futures)@Deprecated
/**
* Return@return a map from topic IDsnames to futures which can be used to check the status of
* individual deletions if the deleteTopics request used topic names. Otherwise return null.
*/
public Map<UuidMap<String, KafkaFuture<Void>> values()
/**
* Return@return a future which succeeds only if all the topic deletions succeed.
*/
public KafkaFuture<Void> all()
}
DeleteTopics Request and Response should be modified.
...
Like the MetadataRequst, ID will be checked first, but if the value is the default zero UUID, topic name will be used instead. If the interbroker protocol version is less than 2.8 and does not support topic IDs, an UNSUPPORTED_VERSION
error will be returned on requests that specify topic ID. When interbroker protocol version is at least 2.8, if If an ID is specified and the ID does not exist, the request will return UNKNOWN_TOPIC_ID
error for the topic indicating the topic ID did not exist. The check for the topic ID will occur before checking authorization on the topic. Thus, topic IDs are not considered sensitive information.
...