Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

kafka-topics.sh --describe will be updated to include the topic ID in the output. A user can specify a topic name to describe with the --topic parameter, or alternatively the user can supply a topic ID with the --topic_id parameter

Migration

Upon a controller becoming active, the list of current topics is loaded from /brokers/topics/[topic]. When a topic without a topic ID is found, a UUID will be randomly generated and assigned the topic information at /brokers/topics/[topic] will be updated with the id filled and the schema version bumped to version 3.

LeaderAndIsrRequest(s) will only be sent by the controller once a topic ID has been successfully assigned to the topic. Since the LeaderAndIsrRequest version was bumped, the IBP must also be bumped for migration.

When a replica receives a LeaderAndIsrRequest containing a topic ID for an existing partition which does not have a topic ID associated, it will create a partition metadata file for the topic partition locally. At this point the local partition will have been migrated to support topic IDs.

Configuration

The following configuration options will be added:

...

Configuration

The following configuration options will be added:

OptionUnitDefaultDescription
delete.stale.topic.delay.ms ms14400 (4 hours)When a FULL or INCREMENTAL LeaderAndIsrRequest is received and the request does not contain a partition that exists on a broker or a broker's topic ID does not match the ID in the request, a deletion event will be staged for that partition which will complete after delete.stale.topic.delay.ms milliseconds.

Compatibility with KIP-500

KIP-500 and KIP-595 utilize a special metadata topic to store information that ZooKeeper has stored in the past. This topic must exist before the controller election, but in KIP-516, topic IDs are assigned in the controller. Here is an outline of how we can handle this.

Problem: KIP-595 describes a Vote Request which is used to elect the controller. Currently KIP-595 contains the topic name as part of the protocol. 

Solution: Change Vote to use topic ID field. Use a sentinel ID reserved only for this topic before its ID is known.

Switching over to topic IDs in this KIP will result in fewer changes later on.

Problem: Post Zookeeper, a Fetch request for the metadata topic will be used to obtain information that was once stored in Zookeeper. KIP-516 stores topic IDs in Zookeeper, and the controller pushes them to brokers using LeaderAndIsrRequests. This will change to pulling the topic IDs to the broker with a fetch of the metadata topic. KIP-516 is replacing the topic name field with a topic ID field. So how will the first Fetch request know the correct topic ID for the metadata topic?

Solution: Use the same sentinel ID reserved for the metadata topic before its ID is known. After controller election, upon receiving the result, assign the metadata topic its unique topic ID.

Using a topic ID will result in a slightly smaller fetch request and likely prevent further changes. Assigning a unique ID for the metadata topic leaves the possibility for the topic to be placed in tiered storage, or used in other scenarios where topics from multiple clusters may be in one place without appending the cluster ID.

Sentinel ID

The idea is that this will be a hard-coded UUID that no other topic can be assigned. Initially the all zero UUID was considered, but was ultimately rejected since this is used as a null ID in some places and it is better to keep these usages separate. An example of a hard-coded UUID is 00000000-0000-0000-0000-000000000001

Vote

Vote will be changed to replace topic name with topic ID, and will use a sentinel topic ID if no topic ID has been assigned already. See above for more information on sentinel topic IDs.

VoteRequest v0

VoteRequest (Version 0) => cluster_id [topics]
  cluster_id => STRING
  topics => topic_id* [partitions]
    topic_id* => UUID
    partitions => partition_index candidate_epoch candidate_id last_offset_epoch last_offset
      partition_index => INT32
      candidate_epoch => INT32
      candidate_id => INT32
      last_offset_epoch => INT32
      last_offset => INT64

VoteResponse v0

VoteResponse (Version 0) => error_code

Compatibility with KIP-500

KIP-500 and KIP-595 utilize a special metadata topic to store information that ZooKeeper has stored in the past. This topic must exist before the controller election, but in KIP-516, topic IDs are assigned in the controller. Here is an outline of how we can handle this.

Problem: KIP-595 describes a Vote Request which is used to elect the controller. Currently KIP-595 contains the topic name as part of the protocol. 

Solution: Change Vote to use topic ID field. Use a sentinel ID reserved only for this topic before its ID is known.

Switching over to topic IDs in this KIP will result in fewer changes later on.

Problem: Post Zookeeper, a Fetch request for the metadata topic will be used to obtain information that was once stored in Zookeeper. KIP-516 stores topic IDs in Zookeeper, and the controller pushes them to brokers using LeaderAndIsrRequests. This will change to pulling the topic IDs to the broker with a fetch of the metadata topic. KIP-516 is replacing the topic name field with a topic ID field. So how will the first Fetch request know the correct topic ID for the metadata topic?

Solution: Use the same sentinel ID reserved for the metadata topic before its ID is known. After controller election, upon receiving the result, assign the metadata topic its unique topic ID.

Using a topic ID will result in a slightly smaller fetch request and likely prevent further changes. Assigning a unique ID for the metadata topic leaves the possibility for the topic to be placed in tiered storage, or used in other scenarios where topics from multiple clusters may be in one place without appending the cluster ID.

Sentinel ID

The idea is that this will be a hard-coded UUID that no other topic can be assigned. Initially the all zero UUID was considered, but was ultimately rejected since this is used as a null ID in some places and it is better to keep these usages separate. An example of a hard-coded UUID is 00000000-0000-0000-0000-000000000001

Vote

Vote will be changed to replace topic name with topic ID, and will use a sentinel topic ID if no topic ID has been assigned already. See above for more information on sentinel topic IDs.

VoteRequest v0

VoteRequest (Version 0) => cluster_id [topics]
  cluster_id => STRINGINT16
  topics => topic_id* [partitions]
    topic_id* => UUID
    partitions => partition_index candidateerror_epoch candidatecode leader_id last_offsetleader_epoch lastvote_offsetgranted
      partition_index => INT32
      candidateerror_epoch code => INT32INT16
      candidateleader_id => INT32
      last_offsetleader_epoch => INT32
      lastvoted_offset granted => INT64

VoteResponse v0

BOOL

BeginQuorumEpoch

BeginQuorumEpoch will replace the topic name field with the topic id field

BeginQuorumEpochRequest v0

BeginQuorumEpochRequest VoteResponse (Version 0) => errorcluster_code id [topics]
  cluster_id => INT16STRING
  topics => topic_id* [partitions]
    topic_id* => UUID
    partitions => partition_index error_code leader_id leader_epoch vote_granted
      partition_index => INT32
      error_code => INT16
      leader_id => INT32
      leader_epoch => INT32
      voted_granted => BOOL

BeginQuorumEpoch

BeginQuorumEpoch will replace the topic name field with the topic id field

BeginQuorumEpochRequest v0

BeginQuorumEpochResponse v0

BeginQuorumEpochResponse BeginQuorumEpochRequest (Version 0) => clustererror_id code [topics]
  cluster_id => STRINGINT16
  topics => topic_id* [partitions]
    topic_id* => UUID
    partitions => partition_index error_code leader_id leader_epoch
      partition_index => INT32
      error_code => INT16
      leader_id => INT32
      leader_epoch => INT32

...


EndQuorumEpoch

EndQuorumEpoch will replace the topic name field with the topic id field

EndQuorumEpochRequest v0

BeginQuorumEpochResponse EndQuorumEpochRequest (Version 0) => errorcluster_code id [topics]
  cluster_id => INT16STRING
  topics => topic_id* [partitions]
    topic_id* => UUID
    partitions => partition_index errorreplica_code id leader_id leader_epoch [preferred_successors]
      partition_index => INT32
      errorreplica_code id => INT16INT32
      leader_id => INT32
      leader_epoch => INT32

EndQuorumEpoch

EndQuorumEpoch will replace the topic name field with the topic id field

EndQuorumEpochRequest v0

      preferred_successors => INT32

EndQuorumEpochResponse v0

EndQuorumEpochResponse EndQuorumEpochRequest (Version 0) => clustererror_id code [topics]
  cluster_id => STRINGINT16
  topics => topic_id* [partitions]
    topic_id* => UUID
    partitions => partition_index replicaerror_id code leader_id leader_epoch [preferred_successors]
      partition_index => INT32
      replicaerror_id code => INT32INT16
      leader_id => INT32
      leader_epoch => INT32

log.dir layout

It would be ideal if the log.dir layout could be restructured from 

...

{topic}_{partition} format to {{topicIdprefix}}/{topicId}_{partition}, e.g. "mytopic_1" → "24/24cc4332-f7de-45a3-b24e-33d61aa0d16c_1". Note the hierarchical directory structure using the first two characters of the topic ID to avoid having too many directories at the top level of the logdir. The exact formatting of the directory is not set in stone, but the idea is to replace topic names with topic IDs in the log directory. This is a significant change and will only be added upon a major release, likely later than some of the other features mentioned in the KIP.

Migration

Upon a controller becoming active, the list of current topics is loaded from /brokers/topics/[topic]. When a topic without a topic ID is found, a UUID will be randomly generated and assigned the topic information at /brokers/topics/[topic] will be updated with the id filled and the schema version bumped to version 3.

LeaderAndIsrRequest(s) will only be sent by the controller once a topic ID has been successfully assigned to the topic. Since the LeaderAndIsrRequest version was bumped, the IBP must also be bumped for migration.

When a replica receives a LeaderAndIsrRequest containing a topic ID for an existing partition which does not have a topic ID associated, it will create a partition metadata file for the topic partition locally. At this point the local partition will have been migrated to support topic IDs.

Changes to the directory structure should coincide with a major release since the changes will make downgrades no longer possible.


...

EndQuorumEpochResponse v0

...

EndQuorumEpochResponse (Version 0) => error_code [topics]
  cluster_id => INT16
  topics => topic_id* [partitions]
    topic_id* => UUID
    partitions => partition_index error_code leader_id leader_epoch
      partition_index => INT32
      error_code => INT16
      leader_id => INT32
      leader_epoch => INT32

Compatibility, Deprecation, and Migration Plan

We will need to support all API calls which refer to a partition by either (topicId, partition) or (topicName, partition) until clients are updated to interact with topics by ID. No deprecations are currently planned.topics by ID. In the first stages, deprecations are not currently planned.

However, when the directory structure is changed, downgrades will be no longer possible and the old directory structure will be deprecated. 


Rejected Alternatives

Sequence ID

...

If global uniqueness across clusters is required for topic IDs the first N bits of the ID could consist of a cluster ID prefix, followed by the sequence number. However, to achieve global uniqueness, this would require a large number of bits for the cluster ID prefix.

Use of a UUID has the benefit of being globally unique across clusters without partitioning the ID space by clusterID, and is conceptually simpler.

Topic Deletion

We considered and rejected two other strategies for performing topic deletes.

Best Effort Strategy

Under this stategy, the controller will attempt to send a StopReplicaRequest to all replicas. The controller will give up after a certain number of retries and will complete the delete. Although this will not simplify the topic deletion code, it will prevent delete topic requests from being blocked if one of the replicas is down. This would now be relatively safe, as stale topics will be deleted when a broker receives an initial LeaderAndIsrRequest, however it could prevent space from being reclaimed from a broker that does not respond to a StopReplicaRequest(s) before it is timed out, but is otherwise alive.

Send StopReplicaRequest(s) to online brokers only

In this approach, the controller will send StopReplicaRequests to only the brokers that are online, and will wait for a response from these brokers before marking the delete as successful. This will allow a topic delete to take place while some replicas are offline. If any replicas return to being online, they will receive an initial LeaderAndIsrRequest that will allow them to clear up any stale state. This is similar to the "best effort strategy above".

org.apache.kafka.common.TopicPartition

Eventually the TopicPartition class should include the topic ID. This may be difficult to enact until all APIs support topic IDs, and could come with a performance impact if implemented prior to this, as TopicPartitions are used for hashmap lookups throughout the broker.

Persisting Topic IDs

A few other alternatives to the partition metadata file were considered. One topic of discussion was whether it was necessary to include at all. With the current decision of maintaining the topic name in the directory, the only way to persist the topic ID to disk is through a file. The decision against changing the directory is discussed below.

Another alternative is to have a single file mapping all topic names to ids. Although this could be useful for tooling, it would be harder to maintain this file and update on each new topic added. 

Future Work

Requests

The following requests could be improved by presence of topic IDs, but are out of scope for this KIP.

  • CreatePartitionsRequest
  • ElectPreferredLeadersRequest
  • AlterReplicaLogDirsRequest
  • AlterConfigsRequest
  • DeleteTopicsRequest
  • DescribeConfigsRequest
  • DescribeLogDirsRequest
  • DeleteRecordsRequest
  • AddPartitionsToTxnRequest
  • TxnOffsetCommitRequest
  • WriteTxnMarkerRequest

Clients

Some of the implemented request types are also relevant to clients. Adding support for topic IDs in the clients would add an additional measure of safety when producing and consuming data.

__consumer_offsets topic

Ideally, consumer offsets stored in the __consumer_offsets topic would be associated with the topic ID for which they were read. However, given the way the __consumer_offsets is compacted, this may be difficult to achieve in a forwards compatible way. This change will be left until topic IDs are implemented in the clients. Another future improvement opportunity is to use topicId in GroupMetadataManager.offsetCommitKey in the offset_commit topic. This may save some space.

log.dir layout

It would be ideal if the log.dir layout could be restructured from {topic}_{partition} format to {{topicIdprefix}}/{topicId}_{partition}, e.g. "mytopic_1" → "24/24cc4332-f7de-45a3-b24e-33d61aa0d16c_1". Note the hierarchical directory structure using the first two characters of the topic ID to avoid having too many directories at the top level of the logdir. This change is not required for the topic deletion improvements above, and will be left for a future KIP where it may be required e.g. topic renames. 

number. However, to achieve global uniqueness, this would require a large number of bits for the cluster ID prefix.

Use of a UUID has the benefit of being globally unique across clusters without partitioning the ID space by clusterID, and is conceptually simpler.

Topic Deletion

We considered and rejected two other strategies for performing topic deletes.

Best Effort Strategy

Under this stategy, the controller will attempt to send a StopReplicaRequest to all replicas. The controller will give up after a certain number of retries and will complete the delete. Although this will not simplify the topic deletion code, it will prevent delete topic requests from being blocked if one of the replicas is down. This would now be relatively safe, as stale topics will be deleted when a broker receives an initial LeaderAndIsrRequest, however it could prevent space from being reclaimed from a broker that does not respond to a StopReplicaRequest(s) before it is timed out, but is otherwise alive.

Send StopReplicaRequest(s) to online brokers only

In this approach, the controller will send StopReplicaRequests to only the brokers that are online, and will wait for a response from these brokers before marking the delete as successful. This will allow a topic delete to take place while some replicas are offline. If any replicas return to being online, they will receive an initial LeaderAndIsrRequest that will allow them to clear up any stale state. This is similar to the "best effort strategy above".

org.apache.kafka.common.TopicPartition

Eventually the TopicPartition class should include the topic ID. This may be difficult to enact until all APIs support topic IDs, and could come with a performance impact if implemented prior to this, as TopicPartitions are used for hashmap lookups throughout the broker.

Persisting Topic IDs

A few other alternatives to the partition metadata file were considered. One topic of discussion was whether it was necessary to include at all. With the current decision of maintaining the topic name in the directory, the only way to persist the topic ID to disk is through a file. The decision against changing the directory is discussed below.

Another alternative is to have a single file mapping all topic names to ids. Although this could be useful for tooling, it would be harder to maintain this file and update on each new topic added. 

Future Work

Requests

The following requests could be improved by presence of topic IDs, but are out of scope for this KIP.

  • CreatePartitionsRequest
  • ElectPreferredLeadersRequest
  • AlterReplicaLogDirsRequest
  • AlterConfigsRequest
  • DeleteTopicsRequest
  • DescribeConfigsRequest
  • DescribeLogDirsRequest
  • DeleteRecordsRequest
  • AddPartitionsToTxnRequest
  • TxnOffsetCommitRequest
  • WriteTxnMarkerRequest

Clients

Some of the implemented request types are also relevant to clients. Adding full support for topic IDs in the clients would add an additional measure of safety when producing and consuming data.

__consumer_offsets topic

Ideally, consumer offsets stored in the __consumer_offsets topic would be associated with the topic ID for which they were read. However, given the way the __consumer_offsets is compacted, this may be difficult to achieve in a forwards compatible way. This change will be left until topic IDs are implemented in the clients. Another future improvement opportunity is to use topicId in GroupMetadataManager.offsetCommitKey in the offset_commit topic. This may save some spaceChanging the directory structure in this way would also require more changes to tooling. Finding the correct log directory for a given topic will require more work for the user with the current changes in the KIP. There are other considerations when it comes to changing the directory structure, so it is probably best to spend more time before we commit to a decision.

Security/Authorization

One idea was to support authorizing a principal for a topic ID rather than a topic name. For now, this would be a breaking change, and it would be hard to support prefixed ACLs with topic IDs.