Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Today, Kafka uniquely identifies a topic by its name. This is generally sufficient, but there are flaws in this scheme if a topic is deleted and recreated with the same name. As a result, Kafka attempts to prevent issues resulting from stale topic issues topics by ensuring a topic is fully deleted from all replicas before completing a deletion. This solution is not perfect, imperfect as it is possible for partitions to be reassigned away from brokers while they are down, and there are no guarantees that this state will ever be cleaned up and will not cause issues in the future.

As When a topic deletion is performed, the controller must wait for all replicas brokers to delete their local replicas, . This blocks creation of a topic with the same name is blocked as a deleted topic until all replicas have successfully deleted a the topic's data. This can mean that downtime for a single broker can effectively cause a complete outage for everyone producing/consuming to that topic name, as the topic cannot be recreated without manual intervention.

...

Overall, topic IDs provide a safer way for brokers to replicate topics without any chance of incorrectly interacting with stale topics with the same name. By preventing these situationssuch scenarios, we can simplify a number of other interactions such as topic deletes , which are currently more complicated and problematic than necessary.

...

Topic IDs will be represented using 64 bit v4 UUIDs. A UUID with all bits 0 will be reserved as a null UUID , as the Kafka RPC protocol does not allow for nullable fields. When printed or stored as a string, topic IDs will be converted to base64 string representation.

On handling a CreateTopicRequest , the broker brokers will create the topic znode under /brokers/topics/[topic], as usual.

The znode value will now contain the an additional topic ID for the topicfield, represented as a base64 string in the "id" field, and the schema version will be bumped to version 3.

...

The controller will maintain local in-memory state containing a mapping from topic name to topic ID. On controller startup, the topic ID will automatically be loaded into memory along with the topics and partitions. A random UUID will be generated on topic creation or on migration of an existing topic without topic IDs.

The controller will The controller will supply topic IDs for all topic partitions to brokers by sending LeaderAndIsrRequest(s) that contain the topic IDs for all partitions contained in the request.

A random UUID will be generated on topic creation or on migration of an existing topic without topic IDs.

Protocol Changes

LeaderAndIsr

...


LeaderAndIsrRequest v3 adds the topic ID to the topic_states field, and a boolean flag an enum type to denote whether this is the initial "full" LeaderAndIsrRequest sent by a controller.the type of LeaderAndIsrRequest. Currently, the first LeaderAndIsrRequest sent to a broker by a controller contains all TopicPartitions that a broker is a replica for. We will formalize this behavior by also including an type enum to denote the type of LeaderAndIsrRequest. 

valueenumdescription
0UNSPECIFIED
1INCREMENTAL
Unspecified type. Default to incremental.
1INCREMENTALA LeaderAndIsrRequest that is not guaranteed to contain all topic partitions assigned to a broker.

2

FULLA full LeaderAndIsrRequest containing all partitions the broker is a replica for.

When type = FULL, the broker can reconcile its local state on disk with the request. Any partition not contained in this request and present on local disk can be staged for deletion. There are two such types of stale request.

1. The TopicPartition is not present in the LeaderAndIsrRequest.

2. The TopicPartition is contained in the request, but the topic request, and safely stage deletions for any partitions that are present on disk and are not contained in the request. This may include cases where a TopicPartition is not present in the LeaderAndIsrRequest, or it may be due to a topic partition containing a topic ID that does not match the local topic partition stored on the broker. Such reconciliation

Reconciliation may also be necessary if type = INCREMENTAL and the topic ID set on a local partition does not match the topic ID contained in the request.

When type = UNSPECIFIED, the request will be treated in a way that allows for backwards compatibility with older request types.

Deletions Deletion of stale partitions triggered by LeaderAndIsrRequest(s) will take place by:

  1. Log Logging at WARN level all partitions that will be deleted and the time that they will be be deleted at WARN level.
  2. Move the partition's directory to log.dir/deleting/{topic_id}_{partition}
  3. A final deletion event will be scheduled for Schedule deletion from disk with a delay of delete.stale.topic.delay.ms ms after the LeaderAndIsrRequest was first received. This will clear the deleting directory of the partition's contents.

LeaderAndIsrResponse v3

LeaderAndIsr Response (Version: 3) => error_code [partitions]
  error_code => INT16
  partitions => topic topic_id* partition error_code
    topic => STRING
    topic_id* => UUID
    partition => INT32
    error_code => INT16

...

To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads from deleted topics. Note that the leader epoch is not sufficient for preventing these issues, as the leader epoch will be is reset after when a topic is deleted and recreated.

...

With the addition of topic IDs , and with the changes to LeaderAndIsrRequest described above, we can now make changes to topic deletion logic that will allow topics to be immediately considered to be deleted, regardless of whether all replicas have responded to a DeleteTopicsRequest.

When the contoller receives a DeleteTopicsRequest, if the IBP is >= MIN_TOPIC_ID_VERSION it will delete the /brokers/topics/[topic] znode payload and immediately reply to the DeleteTopicsRequest with a successful response. At this point, the topic is considered deleted, and a topic with the same name can be created.

Although the topic is considered safely deleted at this point, it must still be garbage collected. The . To garbage collect, the controller will then send StopReplicaRequest(s) to all brokers assigned as replicas for the deleted topic. For the most part, the deletion logic can be maintained between IBP versions, with some differences in responses and cleanup in ZKZooKeeper. Both formats must still be supported, as the IBP may not be bumped right away , and deletes may have already been staged before the IBP bump occurs.

The updated controller's delete logic will:

  1. Collect deleted topics:
    1. Old format: /admin/delete_topics pulling the topic state from /brokers/topics/[topic].
    2. In New in-memory topic deletion states from received DeleteTopicsRequest(s) 
  2. After collecting together topics to be deleted in 1, perform deletes Remove deleted topics from replicas by sending StopReplicaRequest V2 for any topics which do not contain a topic ID, and V3 for any topics which do contain a topic ID.
  3. Finalize successful deletes:
    1. For /admin/delete_topics deletes, we may need to respond to the TopicDeleteRequest. We can also delete the topic znode at /admin/delete_topics/[topic] and /brokers/topics/[topic].
    2. For deletes where for topics with topic IDs are deleted and IBP >= MIN_TOPIC_ID_VERSION, no further action is required., remove the topic from the in memory topic deletion state on the controller.
  4. Any unsuccessful Unsuccessful StopReplicaRequest(s) will be retried after retryMs, starting from 1) and will be maintained in memory.

...