Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Overall, topic IDs provide a safer way for brokers to replicate topics without any chance of incorrectly interacting with stale topics with the same name. By preventing such scenarios, we can simplify a number of other interactions such as topic deletes which are currently more complicated and problematic than necessary.

Public Interfaces

TopicDescription

Minor changes to the TopicDescription interface will be made to allow clients to access the topic ID of topics found in metadata responses.

/**
* Create an instance with the specified parameters.
*
* @param name The topic name
* @param internal Whether the topic is internal to Kafka
* @param partitions A list of partitions where the index represents the partition id and the element contains
*                   leadership and replica information for that partition.
* @param authorizedOperations authorized operations for this topic, or null if this is not known.
* @param topicId Unique value that identifies the topic
*
*/
public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions,
    Set<AclOperation> authorizedOperations, UUID topicId)

/**
* A unique identifier for the topic.
*/
public UUID topicId()

UUID

A new UUID class will be exposed under /org/apache/kafka/common


UUID

A new UUID class will be exposed under /org/apache/kafka/common

/*
 * This class defines an immutable universally unique identifier (UUID). It represents a 128-bit value.
 * More specifically, the random UUIDs in this class are variant 2 (Leach-Salz) version 4 UUIDs.
 * This definition is very similar to java.util.UUID. One notable difference is that the toString() method prints
 * using the base64 string encoding.
 */
public class UUID {

  /**
   * A UUID where all bits are zero/*
 * This class defines an immutable universally unique identifier (UUID). It represents a 128-bit value null or empty UUID.
   */
 More specifically,public thestatic randomfinal UUIDs in this class are variant 2 (Leach-Salz) version 4 UUIDs.
 * This definition is very similar to java.util.UUID. One notable difference is that the toString() method prints
 * using the base64 string encoding.
 */
public class UUID {
UUID ZERO_UUID
  
  /**
   * Constructs a 128-bit type 4 UUID where the first long represents the the most significant 64 bits
   * and the second long represents the least significant 64 bits.
   */
  public UUID(long mostSigBits, long leastSigBits)
  
  /**
   * AStatic UUIDfactory whereto allretrieve bitsa aretype zero.4 It(pseudo represents a null or emptyrandomly generated) UUID.
   */
  public static final UUID ZERO_UUID
  randomUUID()

  /**
   * Constructs a 128-bit type 4 UUID whereReturns the first long represents the the most significant 64 bits
   * andof the secondUUID's long128 represents the least significant 64 bitsbit value.
   */
  public UUID(long mostSigBits, long leastSigBitsgetMostSignificantBits()
  
  /**
   * StaticReturns factorythe toleast retrievesignificant abits typeof 4 (pseudo randomly generated) UUIDthe UUID's 128 bit value.
   */
  public staticlong UUID randomUUIDgetLeastSignificantBits()

  /**
   * Returns thetrue mostiff significantthe bitsobj ofis theanother UUID's with 128the bitsame value.
   */
  public longboolean getMostSignificantBitsequals(Object obj)
  
  /**
   * Returns thea leasthash significantcode bitsfor of thethis UUID's 128 bit value.
   */
  public longint getLeastSignificantBitshashCode()

  /**
   * Returns truea iffbase64 thestring objencoding isof anotherthe UUID with the same value.
   */
  public booleanString equalstoString(Object obj)
  
)

/** * ReturnsCreates a hash code for this UUID */ public int hashCode() /** * Returns based on a base64 string encoding used ofin the toString() UUIDmethod. */ public static UUID fromString(String toString(str)
/** * Creates a UUID based on a base64 string encoding used in the toString() method. */ public static UUID fromString(String str)
}
 
}

Additionally, it may be Additionally, it may be dangerous to use older versions of Kafka tools with new broker versions when using their --zookeeper flags. Use of older tools in this way is not supported today.

...

OffsetForLeaderEpoch Response (Version: 4) => throttle_time_ms [topics]
  throttle_time_ms => INT32
  topics => topic_id* [partitions]
    topic_id* => UUID 
    partitions => error_code partition leader_epoch end_offset
      error_code => INT16
      partition => INT32
      leader_epoch => INT32
      end_offset => INT64

Metadata

MetadataResponse must be modified so that describeTopics includes the topic id for each topic.

MetadataResponse v10

UpdateMetadata

UpdateMetadata should also include the topic ID.

UpdateMetadataRequest v7

UpdateMetadata Request (Version: 7) => controller_id controller_epoch broker_epoch [ungrouped_partition_states] [topic_states] [live_brokers] 
    controller_id
Metadata Response (Version: 10) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations
    throttle_time_ms => INT32
   
brokers controller_epoch => node_id host port rackINT32
   
    nodebroker_id epoch => INT32INT64
   
    host ungrouped_partition_states => STRINGUpdateMetadataPartitionState
        port => INT32topic_states => topic_name topic_id* [partition_states]
       
rack topic_name => STRING
       
clustertopic_id* => STRINGUUID
       
controllerpartition_id states => INT32> UpdateMetadataPartitionState
   
topics live_brokers => error_code name topic_id* is_internal [partitions] topic_authorized_operationsid v0_host v0_port [endpoints] rack
       
error_code id => INT16INT32
       
name v0_host => STRING
     
  topic_id*   v0_port => UUIDINT32
       
is_internal => BOOL
endpoints => port host listener security_protocol
           
partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas]port => INT32
           
error_code => INT16host => STRING
           
partition_index listener => INT32STRING
           
leadersecurity_id protocol => INT32INT16
       
    leader_epoch => INT32
            replica_nodes => INT32
            isr_nodes => INT32
            offline_replicas rack => STRING


Produce

Swapping a the topic name for the topic ID will cut down on the size of the request.

 ProduceRequest v9

Produce Request (Version 9) => transactional_id acks timeout_ms [topics]
    transactional_id => STRING
    acks => INT16
    timeout_ms => INT32
   

    topic_authorized_operations => INT32
    cluster_authorized_operations => INT32

UpdateMetadata

UpdateMetadata should also include the topic ID.

UpdateMetadataRequest v7

topics => topic_id* [partitions]
        topic_id* => UUID
        partitions => partition_index records
            partition_index => INT32
            records => BYTES

ProduceResponse v9

Produce Response (Version 9) => [responses] throttle_time_ms
    responses => topic_id* [partitions

UpdateMetadata Request (Version: 7) => controller_id controller_epoch broker_epoch [ungrouped_partition_states] [topic_states] [live_brokers] 
    controller_id => INT32
    controller_epoch => INT32
    broker_epoch => INT64
    ungrouped_partition_states => UpdateMetadataPartitionState
    topic_states => topic_name topic_id* [partition_states

]
        topic_

name

id* =>

STRING

UUID
       

topic_id* => UUID

partitions => partition_index error_code base_offset log_append_time_ms log_start_offset [record_errors] error_message
            partition_

states

index =

> UpdateMetadataPartitionState

> INT32
         

live

  error_

brokers

code =>

id v0_host v0_port [endpoints] rack

INT16
       

id

    base_offset =>

INT32

INT64
           

v0_host

log_append_time_ms =>

STRING

INT64
           

v0

log_start_

port

offset =>

INT32

INT64
           

endpoints => port host listener security_protocol

record_errors => batch_index batch_index_error_message
               

port

batch_index => INT32
               

host

batch_index_error_message => STRING
           

listener

error_message => STRING
   

        security_protocol => INT16
        rack => STRING

Produce

Swapping a the topic name for the topic ID will cut down on the size of the request.

 ProduceRequest v9

...

Produce Request (Version 9) => transactional_id acks timeout_ms [topics]
    transactional_id => STRING
    acks => INT16
    timeout_ms => INT32
    topics => topic_id* [partitions]
        topic_id* => UUID
        partitions => partition_index records
            partition_index => INT32
            records => BYTES

ProduceResponse v9

...

Produce Response (Version 9) => [responses] throttle_time_ms
    responses => topic_id* [partitions]
        topic_id* => UUID
        partitions => partition_index error_code base_offset log_append_time_ms log_start_offset [record_errors] error_message
            partition_index => INT32
            error_code => INT16
            base_offset => INT64
            log_append_time_ms => INT64
            log_start_offset => INT64
            record_errors => batch_index batch_index_error_message
                batch_index => INT32
                batch_index_error_message => STRING
            error_message => STRING
    throttle_time_ms =>  INT32  

throttle_time_ms =>  INT32  

DeleteTopics

With the addition of topic IDs and the changes to LeaderAndIsrRequest described above, we can now make changes to topic deletion logic that will allow topics to be immediately considered deleted, regardless of whether all replicas have responded to a DeleteTopicsRequest.

When the controller receives a DeleteTopicsRequest, if the IBP is >= MIN_TOPIC_ID_VERSION it will delete the /brokers/topics/[topic] znode payload and immediately reply to the DeleteTopicsRequest with a successful response. At this point, the topic is considered deleted, and a topic with the same name can be created.

Although the topic is safely deleted at this point, it must still be garbage collected. To garbage collect, the controller will then send StopReplicaRequest(s) to all brokers assigned as replicas for the deleted topic. For the most part, deletion logic can be maintained between IBP versions, with some differences in responses and cleanup in ZooKeeper. Both formats must still be supported, as the IBP may not be bumped right away and deletes may have already been staged before the IBP bump occurs.

The updated controller's delete logic will:

  1. Collect deleted topics:
    1. Old format: /admin/delete_topics pulling the topic state from /brokers/topics/[topic].
    2. New in-memory topic deletion states from received DeleteTopicsRequest(s) 
  2. Remove deleted topics from replicas by sending StopReplicaRequest V3 before the IBP bump using the old logic, and using V4 and the new logic with topic IDs after the IBP bump.
  3. Finalize successful deletes:
    1. For /admin/delete_topics deletes, we may need to respond to the TopicDeleteRequest. We can also delete the topic znode at /admin/delete_topics/[topic] and /brokers/topics/[topic].
    2. For deletes for topics with topic IDs, remove the topic from the in memory topic deletion state on the controller.
  4. Any unsuccessful StopReplicaRequest(s) will be retried after retryMs, starting from 1) and will be maintained in memory.

This leads to the question of what should be done if the controller never receives a successful response from a replica for a StopReplicaRequest. Under such a scenario it is still safe to stop retrying after a reasonable number of retries and time. Given that LeaderAndIsrRequest v5 includes a type flag, allowing for FULL requests to be identified, any stale partitions will be reconciled and deleted by a broker on startup upon receiving the initial LeaderAndIsrRequest from the a controller. This condition is also safe if the controller changes before the StopReplicaRequest(s) succeed, as the new controller will send a FULL LeaderAndIsrRequest on becoming the leader, ensuring that any stale partitions are cleaned up.

Immediate delete scenarios

Stale reads

  1. Broker B1 is a leader for topic partition A_p0_id0
  2. Topic A id0 is deleted.
  3. Topic A id1 is created.
  4. Broker B1 has not yet received a new LeaderAndIsrRequest, nor a StopReplicaRequest for topic partition A_p0_id0
  5. Broker B2 has received a LeaderAndIsrRequest for topic partition A_p0 _id0, and starts fetching from B1.

Inclusion of topic IDs in FetchRequest/ListOffsetRequest/OffsetsForLeaderEpochRequest(s) ensure that this scenario is safe. By adding the topic ID to these request types, any request to stale partitions will not be successful.

Stale state

  1. Broker B1 is a replica for A_p0_id0.
  2. Topic A id0 is deleted.
  3. B1 and has not does not receive a StopReplicaRequest for A_p0_id0.
  4. Topic A id1 is created.
  5. Broker B1 receives a LeaderAndIsrRequest containing partition A_p0_id1.

When this occurs, we will close the Log for A_p0_id0, and move A_p0_id0 to the deleting directory as described in the LeaderAndIsrRequest description above.

Storage

Partition Metadata file

To allow brokers to resolve the topic name under this structure, a metadata file will be created at logdir/partitiondir/partition.metadata.

This metadata file will be human readable, and will include:

  • Metadata schema version (schema_version: int32)
  • Topic ID (id: UUID)

This file will be plain text (key/value pairs).

version: 0

topic_id: 46bdb63f9e8d4a38bf7bee4eb2a794e4


One important use for this file is the current directory structure does not allow us to reload the broker's view of topic ID on startup (perhaps after a failure). It is necessary to persist this file to disk so this information can be reloaded. 


It will be easy to update the file to include more fields in the future. This may assist with tooling purposes like mapping topic IDs to topic names.

In the JBOD mode, a partition's data can be moved from one disk to another. The partition metadata file would be copied during this process.

Tooling

kafka-topics.sh --describe will be updated to include the topic ID in the output. A user can specify a topic name to describe with the --topic parameter, or alternatively the user can supply a topic ID with the --topic_id parameter

Configuration

The following configuration options will be added:

OptionUnitDefaultDescription
delete.stale.topic.delay.ms ms14400 (4 hours)When a FULL or INCREMENTAL LeaderAndIsrRequest is received and the request does not contain a partition that exists on a broker or a broker's topic ID does not match the ID in the request, a deletion event will be staged for that partition which will complete after delete.stale.topic.delay.ms milliseconds.

AdminClient Support

Access to topic IDs from the AdminClient will make it easier for users to obtain topics' topic IDs. It can also ensure correctness when deleting topics. This will require some changes to public APIs and protocols

CreateTopics

Upon creation of a topic, the topic ID will be included in the TopicMetadataAndConfig which is included in CreateTopicsResult. It can be accessed through a method in CreateTopicsResult or the TopicMetadataAndConfig object.

CreateTopicsResult

public class CreateTopicsResult {

  public KafkaFuture<UUID> topicId(String topic)

...

  public static class TopicMetadataAndConfig {

    TopicMetadataAndConfig(UUID topicId, int numPartitions, int replicationFactor, Config config)

    public UUID topicId()

}

The protocol for CreateTopicsResponse will also need a slight modification.

CreateTopicsResponse v7

CreateTopics Response (Version: 7) => throttle_time_ms [topics]
  throttle_time_ms => INT32
  topics => name topic_id* error_code error_message topic_config_error_code num_partitions replication_factor [configs]
         name => STRING
    topic_id* => UUID
    error_code => INT16
    error_message => STRING
    topic_config_error_code => INT16
    num_partitions => INT32
    replication_factor => INT16 

    configs => name value read_only config_source is_sensitive
      name => STRING
      value => STRING
      read_only => BOOL
      config_source => INT8
      is_sensitive => BOOL

Describe Topics

There are two use cases we want to support. 1) Obtaining topic IDs when asking to describe topics and 2) supplying topic IDs to get a description of the topics

For the first use case, we need to modify TopicDescription and MetadataResponse

TopicDescription

/**
* Create an instance with the specified parameters.
*
* @param name The topic name
* @param internal Whether the topic is internal to Kafka
* @param partitions A list of partitions where the index represents the partition id and the element contains
*                   leadership and replica information for that partition.
* @param authorizedOperations authorized operations for this topic, or null if this is not known.
* @param topicId Unique value that identifies the topic
*
*/
public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions,
    Set<AclOperation> authorizedOperations, UUID topicId)


/**
* A unique identifier for the topic.
*/
public UUID topicId()

MetadataResponse v10

Metadata Response (Version: 10) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations
    throttle_time_ms => INT32
    brokers => node_id host port rack
        node_id => INT32
        host => STRING
        port => INT32
        rack => STRING
    cluster_id => STRING
    controller_id => INT32
    topics => error_code name topic_id* is_internal [partitions] topic_authorized_operations
        error_code => INT16
        name => STRING
        topic_id* => UUID
        is_internal => BOOL
        partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas]
            error_code => INT16
            partition_index => INT32
            leader_id => INT32
            leader_epoch => INT32
            replica_nodes => INT32
            isr_nodes => INT32
            offline_replicas => INT32
        topic_authorized_operations => INT32
    cluster_authorized_operations => INT32

Even when topic IDs are supported, the response will contain both the topic name and the topic ID.

 Additionally, new methods will need to be added to the Admin interface and KafkaAdminClient

Admin and KafkaAdminClient

default DescribeTopicsResult describeTopics(Collection<UUID> topicIds)

DescribeTopicsResult describeTopics(Collection<UUID> topicIds, DescribeTopicsOptions options)


MetadataRequest must also be modified. Topic name will be left in to allow requests to be made either by topic name or topic ID.

ID will be checked first, but if the value is the default zero UUID, topic name will be used instead. If an ID is specified and the ID does not exist, the request will fail regardless of allow_auto_topic_creation.
If name and ID are included, but the name does not match the ID, the request will also fail.

MetadataRequest v10

Metadata Request (Version: 10) => [topics] allow_auto_topic_creation include_cluster_authorized_operations include_topic_authorized_operations
    topics => name topic_id*
      name => STRING
      topic_id* => UUID
    allow_auto_topic_creation => BOOL
    include_cluster_authorized_operations => BOOL
    include_topic_authorized_operations => BOOL

DeleteTopics

It will be useful for the AdminClient to be able to specify a list of topic Ids to delete to ensure the correct topics are being deleted. New methods will need to be added to the Admin interface and KafkaAdminClient

Admin and KafkaAdminClient

default DeleteTopicsResult deleteTopics(Collection<UUID> topics)

DeleteTopicsResult deleteTopics(Collection<UUID> topics, DeleteTopicsOptions options);


DeleteTopics Request and Response should be modified.

DeleteTopicsRequest v6

Metadata Request (Version: 6) => [topic_names] timeout_ms
    topic_names => STRING
    timeout_ms => INT32

DeleteTopicsResponse v6

Metadata Request (Version: 6) => throttle_time_ms [responses]
    throttle_time_ms => INT32
    responses => name topic_id* error_code error_message
      name => STRING
      topic_id* => UUID
      error_code => INT16
      error_message => STRING

Even in the cases where only topic ID or only topic name are included in the request, if topic Ids are supported, the response will contain both the name and the ID

DeleteTopics

With the addition of topic IDs and the changes to LeaderAndIsrRequest described above, we can now make changes to topic deletion logic that will allow topics to be immediately considered deleted, regardless of whether all replicas have responded to a DeleteTopicsRequest.

When the controller receives a DeleteTopicsRequest, if the IBP is >= MIN_TOPIC_ID_VERSION it will delete the /brokers/topics/[topic] znode payload and immediately reply to the DeleteTopicsRequest with a successful response. At this point, the topic is considered deleted, and a topic with the same name can be created.

Although the topic is safely deleted at this point, it must still be garbage collected. To garbage collect, the controller will then send StopReplicaRequest(s) to all brokers assigned as replicas for the deleted topic. For the most part, deletion logic can be maintained between IBP versions, with some differences in responses and cleanup in ZooKeeper. Both formats must still be supported, as the IBP may not be bumped right away and deletes may have already been staged before the IBP bump occurs.

The updated controller's delete logic will:

  1. Collect deleted topics:
    1. Old format: /admin/delete_topics pulling the topic state from /brokers/topics/[topic].
    2. New in-memory topic deletion states from received DeleteTopicsRequest(s) 
  2. Remove deleted topics from replicas by sending StopReplicaRequest V3 before the IBP bump using the old logic, and using V4 and the new logic with topic IDs after the IBP bump.
  3. Finalize successful deletes:
    1. For /admin/delete_topics deletes, we may need to respond to the TopicDeleteRequest. We can also delete the topic znode at /admin/delete_topics/[topic] and /brokers/topics/[topic].
    2. For deletes for topics with topic IDs, remove the topic from the in memory topic deletion state on the controller.
  4. Any unsuccessful StopReplicaRequest(s) will be retried after retryMs, starting from 1) and will be maintained in memory.

This leads to the question of what should be done if the controller never receives a successful response from a replica for a StopReplicaRequest. Under such a scenario it is still safe to stop retrying after a reasonable number of retries and time. Given that LeaderAndIsrRequest v5 includes a type flag, allowing for FULL requests to be identified, any stale partitions will be reconciled and deleted by a broker on startup upon receiving the initial LeaderAndIsrRequest from the a controller. This condition is also safe if the controller changes before the StopReplicaRequest(s) succeed, as the new controller will send a FULL LeaderAndIsrRequest on becoming the leader, ensuring that any stale partitions are cleaned up.

Immediate delete scenarios

Stale reads

  1. Broker B1 is a leader for topic partition A_p0_id0
  2. Topic A id0 is deleted.
  3. Topic A id1 is created.
  4. Broker B1 has not yet received a new LeaderAndIsrRequest, nor a StopReplicaRequest for topic partition A_p0_id0
  5. Broker B2 has received a LeaderAndIsrRequest for topic partition A_p0 _id0, and starts fetching from B1.

Inclusion of topic IDs in FetchRequest/ListOffsetRequest/OffsetsForLeaderEpochRequest(s) ensure that this scenario is safe. By adding the topic ID to these request types, any request to stale partitions will not be successful.

Stale state

  1. Broker B1 is a replica for A_p0_id0.
  2. Topic A id0 is deleted.
  3. B1 and has not does not receive a StopReplicaRequest for A_p0_id0.
  4. Topic A id1 is created.
  5. Broker B1 receives a LeaderAndIsrRequest containing partition A_p0_id1.

When this occurs, we will close the Log for A_p0_id0, and move A_p0_id0 to the deleting directory as described in the LeaderAndIsrRequest description above.

Storage

Partition Metadata file

To allow brokers to resolve the topic name under this structure, a metadata file will be created at logdir/partitiondir/partition.metadata.

This metadata file will be human readable, and will include:

  • Metadata schema version (schema_version: int32)
  • Topic ID (id: UUID)

This file will be plain text (key/value pairs).

...

version: 0

topic_id: 46bdb63f9e8d4a38bf7bee4eb2a794e4

One important use for this file is the current directory structure does not allow us to reload the broker's view of topic ID on startup (perhaps after a failure). It is necessary to persist this file to disk so this information can be reloaded. 

It will be easy to update the file to include more fields in the future. This may assist with tooling purposes like mapping topic IDs to topic names.

In the JBOD mode, a partition's data can be moved from one disk to another. The partition metadata file would be copied during this process.

Tooling

kafka-topics.sh --describe will be updated to include the topic ID in the output. A user can specify a topic name to describe with the --topic parameter, or alternatively the user can supply a topic ID with the --topic_id parameter

Configuration

The following configuration options will be added:

...

.

Compatibility with KIP-500

...

  • CreatePartitionsRequest
  • ElectPreferredLeadersRequest
  • AlterReplicaLogDirsRequest
  • AlterConfigsRequestDeleteTopicsRequest
  • DescribeConfigsRequest
  • DescribeLogDirsRequest
  • DeleteRecordsRequest
  • AddPartitionsToTxnRequest
  • TxnOffsetCommitRequest
  • WriteTxnMarkerRequest

...