Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

It is unnecessary to include the name of the topic in the following Request/Response calls:
StopReplica
Fetch
List Offsets
OffsetForLeader
Vote
BeginQuorumEpoch
EndQuorumEpoch

Including the topic name in the request may make it easier to debug when issues arise, as it will provide more information than the topic ID alone. However, it will also bloat the protocol (especially relevant for FetchRequest), and if they are incorrectly used it may prevent topic renames from being easily implemented in the future. For these reasons, the topic name field has been removed.

...

UpdateMetadata Request (Version: 7) => controller_id controller_epoch broker_epoch [ungrouped_partition_states] [topic_states] [live_brokers] 
    controller_id => INT32
    controller_epoch => INT32
    broker_epoch => INT64
    ungrouped_partition_states => UpdateMetadataPartitionState
    topic_states => topic_name topic_id* [partition_states]
        topic_name => STRING
        topic_id* => UUID
        partition_states => UpdateMetadataPartitionState
    live_brokers => id v0_host v0_port [endpoints] rack
        id => INT32
        v0_host => STRING
        v0_port => INT32
        endpoints => port host listener security_protocol
            port => INT32
            host => STRING
            listener => STRING
            security_protocol => INT16
        rack => STRING

DeleteTopics

With the addition of topic IDs and the changes to LeaderAndIsrRequest described above, we can now make changes to topic deletion logic that will allow topics to be immediately considered deleted, regardless of whether all replicas have responded to a DeleteTopicsRequest.

When the controller receives a DeleteTopicsRequest, if the IBP is >= MIN_TOPIC_ID_VERSION it will delete the /brokers/topics/[topic] znode payload and immediately reply to the DeleteTopicsRequest with a successful response. At this point, the topic is considered deleted, and a topic with the same name can be created.

Although the topic is safely deleted at this point, it must still be garbage collected. To garbage collect, the controller will then send StopReplicaRequest(s) to all brokers assigned as replicas for the deleted topic. For the most part, deletion logic can be maintained between IBP versions, with some differences in responses and cleanup in ZooKeeper. Both formats must still be supported, as the IBP may not be bumped right away and deletes may have already been staged before the IBP bump occurs.

The updated controller's delete logic will:

  1. Collect deleted topics:
    1. Old format: /admin/delete_topics pulling the topic state from /brokers/topics/[topic].
    2. New in-memory topic deletion states from received DeleteTopicsRequest(s) 
  2. Remove deleted topics from replicas by sending StopReplicaRequest V3 before the IBP bump using the old logic, and using V4 and the new logic with topic IDs after the IBP bump.
  3. Finalize successful deletes:
    1. For /admin/delete_topics deletes, we may need to respond to the TopicDeleteRequest. We can also delete the topic znode at /admin/delete_topics/[topic] and /brokers/topics/[topic].
    2. For deletes for topics with topic IDs, remove the topic from the in memory topic deletion state on the controller.
  4. Any unsuccessful StopReplicaRequest(s) will be retried after retryMs, starting from 1) and will be maintained in memory.

This leads to the question of what should be done if the controller never receives a successful response from a replica for a StopReplicaRequest. Under such a scenario it is still safe to stop retrying after a reasonable number of retries and time. Given that LeaderAndIsrRequest v5 includes a type flag, allowing for FULL requests to be identified, any stale partitions will be reconciled and deleted by a broker on startup upon receiving the initial LeaderAndIsrRequest from the a controller. This condition is also safe if the controller changes before the StopReplicaRequest(s) succeed, as the new controller will send a FULL LeaderAndIsrRequest on becoming the leader, ensuring that any stale partitions are cleaned up.

Immediate delete scenarios

Stale reads

  1. Broker B1 is a leader for topic partition A_p0_id0
  2. Topic A id0 is deleted.
  3. Topic A id1 is created.
  4. Broker B1 has not yet received a new LeaderAndIsrRequest, nor a StopReplicaRequest for topic partition A_p0_id0
  5. Broker B2 has received a LeaderAndIsrRequest for topic partition A_p0 _id0, and starts fetching from B1.

Inclusion of topic IDs in FetchRequest/ListOffsetRequest/OffsetsForLeaderEpochRequest(s) ensure that this scenario is safe. By adding the topic ID to these request types, any request to stale partitions will not be successful.

Stale state

  1. Broker B1 is a replica for A_p0_id0.
  2. Topic A id0 is deleted.
  3. B1 and has not does not receive a StopReplicaRequest for A_p0_id0.
  4. Topic A id1 is created.
  5. Broker B1 receives a LeaderAndIsrRequest containing partition A_p0_id1.

When this occurs, we will close the Log for A_p0_id0, and move A_p0_id0 to the deleting directory as described in the LeaderAndIsrRequest description above.

Storage

Partition Metadata file

To allow brokers to resolve the topic name under this structure, a metadata file will be created at logdir/partitiondir/partition.metadata.

This metadata file will be human readable, and will include:

  • Metadata schema version (schema_version: int32)
  • Topic ID (id: UUID)

This file will be plain text (key/value pairs).

...

version: 0

topic_id: 46bdb63f-9e8d-4a38-bf7b-ee4eb2a794e4

One important use for this file is  the directory structure does not allow us to reload the broker's view of topic ID on startup (perhaps after a failure). It is necessary to persist this file to disk so this information can be reloaded. 

During LeaderAndIsrRequests, this file may be used to disambiguate topics safely and delete topics if necessary. More details on this process are explained in the LeaderAndIsrRequest v5 section.

It will be easy to update the file to include more fields in the future.

...


Vote

Vote will be changed to replace topic name with topic ID, and will use a sentinel topic ID if no topic ID has been assigned already. See Compatibility with KIP-500 for more information on sentinel topic IDs.

VoteRequest v0

VoteRequest (Version 0) => cluster_id [topics]
  cluster_id => STRING
  topics => topic_id* [partitions]
    topic_id* => UUID
    partitions => partition_index candidate_epoch candidate_id last_offset_epoch last_offset
      partition_index => INT32
      candidate_epoch => INT32
      candidate_id => INT32
      last_offset_epoch => INT32
      last_offset => INT64

VoteResponse v0

VoteResponse (Version 0) => error_code [topics]
  cluster_id => INT16
  topics => topic_id* [partitions]
    topic_id* => UUID
    partitions => partition_index error_code leader_id leader_epoch vote_granted
      partition_index => INT32
      error_code => INT16
      leader_id => INT32
      leader_epoch => INT32
      voted_granted => BOOL

BeginQuorumEpoch

BeginQuorumEpoch will replace the topic name field with the topic id field

BeginQuorumEpochRequest v0

BeginQuorumEpochRequest (Version 0) => cluster_id [topics]
  cluster_id => STRING
  topics => topic_id* [partitions]
    topic_id* => UUID
    partitions => partition_index leader_id leader_epoch
      partition_index => INT32
      leader_id => INT32
      leader_epoch => INT32

BeginQuorumEpochResponse v0

BeginQuorumEpochResponse (Version 0) => error_code [topics]
  cluster_id => INT16
  topics => topic_id* [partitions]
    topic_id* => UUID
    partitions => partition_index error_code leader_id leader_epoch
      partition_index => INT32
      error_code => INT16
      leader_id => INT32
      leader_epoch => INT32

EndQuorumEpoch

EndQuorumEpoch will replace the topic name field with the topic id field

EndQuorumEpochRequest v0

EndQuorumEpochRequest (Version 0) => cluster_id [topics]
  cluster_id => STRING
  topics => topic_id* [partitions]
    topic_id* => UUID
    partitions => partition_index replica_id leader_id leader_epoch [preferred_successors]
      partition_index => INT32
      replica_id => INT32
      leader_id => INT32
      leader_epoch => INT32
      preferred_successors => INT32

EndQuorumEpochResponse v0

EndQuorumEpochResponse (Version 0) => error_code [topics]
  cluster_id => INT16
  topics => topic_id* [partitions]
    topic_id* => UUID
    partitions => partition_index error_code leader_id leader_epoch
      partition_index => INT32
      error_code => INT16
      leader_id => INT32
      leader_epoch => INT32


DeleteTopics

With the addition of topic IDs and the changes to LeaderAndIsrRequest described above, we can now make changes to topic deletion logic that will allow topics to be immediately considered deleted, regardless of whether all replicas have responded to a DeleteTopicsRequest.

When the controller receives a DeleteTopicsRequest, if the IBP is >= MIN_TOPIC_ID_VERSION it will delete the /brokers/topics/[topic] znode payload and immediately reply to the DeleteTopicsRequest with a successful response. At this point, the topic is considered deleted, and a topic with the same name can be created.

Although the topic is safely deleted at this point, it must still be garbage collected. To garbage collect, the controller will then send StopReplicaRequest(s) to all brokers assigned as replicas for the deleted topic. For the most part, deletion logic can be maintained between IBP versions, with some differences in responses and cleanup in ZooKeeper. Both formats must still be supported, as the IBP may not be bumped right away and deletes may have already been staged before the IBP bump occurs.

The updated controller's delete logic will:

  1. Collect deleted topics:
    1. Old format: /admin/delete_topics pulling the topic state from /brokers/topics/[topic].
    2. New in-memory topic deletion states from received DeleteTopicsRequest(s) 
  2. Remove deleted topics from replicas by sending StopReplicaRequest V3 before the IBP bump using the old logic, and using V4 and the new logic with topic IDs after the IBP bump.
  3. Finalize successful deletes:
    1. For /admin/delete_topics deletes, we may need to respond to the TopicDeleteRequest. We can also delete the topic znode at /admin/delete_topics/[topic] and /brokers/topics/[topic].
    2. For deletes for topics with topic IDs, remove the topic from the in memory topic deletion state on the controller.
  4. Any unsuccessful StopReplicaRequest(s) will be retried after retryMs, starting from 1) and will be maintained in memory.

This leads to the question of what should be done if the controller never receives a successful response from a replica for a StopReplicaRequest. Under such a scenario it is still safe to stop retrying after a reasonable number of retries and time. Given that LeaderAndIsrRequest v5 includes a type flag, allowing for FULL requests to be identified, any stale partitions will be reconciled and deleted by a broker on startup upon receiving the initial LeaderAndIsrRequest from the a controller. This condition is also safe if the controller changes before the StopReplicaRequest(s) succeed, as the new controller will send a FULL LeaderAndIsrRequest on becoming the leader, ensuring that any stale partitions are cleaned up.

Immediate delete scenarios

Stale reads

  1. Broker B1 is a leader for topic partition A_p0_id0
  2. Topic A id0 is deleted.
  3. Topic A id1 is created.
  4. Broker B1 has not yet received a new LeaderAndIsrRequest, nor a StopReplicaRequest for topic partition A_p0_id0
  5. Broker B2 has received a LeaderAndIsrRequest for topic partition A_p0 _id0, and starts fetching from B1.

Inclusion of topic IDs in FetchRequest/ListOffsetRequest/OffsetsForLeaderEpochRequest(s) ensure that this scenario is safe. By adding the topic ID to these request types, any request to stale partitions will not be successful.

Stale state

  1. Broker B1 is a replica for A_p0_id0.
  2. Topic A id0 is deleted.
  3. B1 and has not does not receive a StopReplicaRequest for A_p0_id0.
  4. Topic A id1 is created.
  5. Broker B1 receives a LeaderAndIsrRequest containing partition A_p0_id1.

When this occurs, we will close the Log for A_p0_id0, and move A_p0_id0 to the deleting directory as described in the LeaderAndIsrRequest description above.

Storage

Partition Metadata file

To allow brokers to resolve the topic name under this structure, a metadata file will be created at logdir/partitiondir/partition.metadata.

This metadata file will be human readable, and will include:

  • Metadata schema version (schema_version: int32)
  • Topic ID (id: UUID)

This file will be plain text (key/value pairs).

version: 0

topic_id: 46bdb63f-9e8d-4a38-bf7b-ee4eb2a794e4


One important use for this file is  the directory structure does not allow us to reload the broker's view of topic ID on startup (perhaps after a failure). It is necessary to persist this file to disk so this information can be reloaded. 

During LeaderAndIsrRequests, this file may be used to disambiguate topics safely and delete topics if necessary. More details on this process are explained in the LeaderAndIsrRequest v5 section.


It will be easy to update the file to include more fields in the future.

In the JBOD mode, a partition's data can be moved from one disk to another. The partition metadata file would be copied during this process.

Compatibility with KIP-500

KIP-500 and KIP-595 utilize a special metadata topic to store information that ZooKeeper has stored in the past. This topic must exist before the controller election, but in KIP-516, topic IDs are assigned in the controller. Here is an outline of how we can handle this.

Problem: KIP-595 describes a Vote Request which is used to elect the controller. Currently KIP-595 contains the topic name as part of the protocol. 

Solution: Use a sentinel ID reserved only for this topic before its ID is known.

Switching over to topic IDs in this KIP will result in fewer changes later on.

Problem: Post Zookeeper, a Fetch request for the metadata topic will be used to obtain information that was once stored in Zookeeper. KIP-516 stores topic IDs in Zookeeper, and the controller pushes them to brokers using LeaderAndIsrRequests. This will change to pulling the topic IDs to the broker with a fetch of the metadata topic. KIP-516 is replacing the topic name field with a topic ID field. So how will the first Fetch request know the correct topic ID for the metadata topic?

Solution: Use the same sentinel ID reserved for the metadata topic before its ID is known. After controller election, upon receiving the result, assign the metadata topic its unique topic ID.

Using a topic ID will result in a slightly smaller fetch request and likely prevent further changes. Assigning a unique ID for the metadata topic leaves the possibility for the topic to be placed in tiered storage, or used in other scenarios where topics from multiple clusters may be in one place without appending the cluster ID.

Sentinel ID

The idea is that this will be a hard-coded UUID that no other topic can be assigned. Initially the all zero UUID was considered, but was ultimately rejected since this is used as a null ID in some places and it is better to keep these usages separate. An example of a hard-coded UUID is 00000000-0000-0000-0000-000000000001

Tooling

kafka-topics.sh --describe will be updated to include the topic ID in the output. A user can specify a topic name to describe with the --topic parameter, or alternatively the user can supply a topic ID with the --topic_id parameter

...