Status
Current state: Under Discussion
Discussion thread: https://lists.apache.org/thread.html/7efa8cd169cadc7dc9cf86a7c0dbbab1836ddb5024d310fcebacf80c@%3Cdev.kafka.apache.org%3E
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Today, Kafka uniquely identifies a topic by its name. This is generally sufficient, but there are flaws in this scheme if a topic is deleted and recreated with the same name. Kafka attempts to prevent issues resulting from stale topics by ensuring a topic is fully deleted from all replicas before completing a deletion. This solution is imperfect as it is possible for partitions to be reassigned away from brokers while they are down, and there are no guarantees that this state will ever be cleaned up.
When a topic deletion is performed, the controller must wait for all brokers to delete their local replicas. This blocks creation of a topic with the same name as a deleted topic until all replicas have successfully deleted the topic's data. This can mean that downtime for a single broker can effectively cause a complete outage for everyone producing/consuming to that topic name, as the topic cannot be recreated without manual intervention.
Topic IDs aim to address this issue by associating a truly unique ID with each topic, ensuring a newly created topic with a previously used name cannot be confused with a previous topic with that name.
Topic IDs solve several additional problems:
- Renaming topics becomes feasible (although there may still be some complexity with the need to support the old name for a while as part of migration, etc.) Renaming topics may seem minor, but it will be difficult to have hierarchical topics without having some kind of renaming support.
- We can eventually get rid of the "deleting" state for topics. If a broker is down but there is some topic data there that is no longer relevant, it won't cause problems later on. It can be deleted when the broker rejoins the cluster and realizes that the relevant topic ID is not present any more. We gain some additional safety where stale/deleted replicas may currently interact with live ones.
- Sending 16 byte UUIDs instead of Strings over Kafka RPCs can be smaller. A string is 2 bytes plus the data, whereas the UUID is fixed 16 bytes. For any topic name with more than than 14 single byte characters (16 bytes serialized), UUIDs will be smaller. They will also be faster to compare and more friendly to the garbage collector.
- They will provide a true measure of topic uniqueness across clusters. This may be important in multi-cluster Kafka deployments where additional safety and debuggability is desired.
Overall, topic IDs provide a safer way for brokers to replicate topics without any chance of incorrectly interacting with stale topics with the same name. By preventing such scenarios, we can simplify a number of other interactions such as topic deletes which are currently more complicated and problematic than necessary.
Public Interfaces
No changes to public interfaces will be made in this release. However, it may be dangerous to use older versions of Kafka tools with new broker versions when using their --zookeeper
flags.
Proposed Changes
Topic IDs will be represented using 64 bit v4 UUIDs. A UUID with all bits 0 will be reserved as a null UUID as the Kafka RPC protocol does not allow for nullable fields. When printed or stored as a string, topic IDs will be converted to base64 string representation.
On handling a CreateTopicRequest brokers will create the topic znode under /brokers/topics/[topic], as usual.
The znode value will now contain an additional topic ID field, represented as a base64 string in the "id" field, and the schema version will be bumped to version 3.
|
The controller will maintain local in-memory state containing a mapping from topic name to topic ID. On controller startup, the topic ID will automatically be loaded into memory along with the topics and partitions. A random UUID will be generated on topic creation or on migration of an existing topic without topic IDs.
The controller will supply topic IDs for all topic partitions to brokers by sending LeaderAndIsrRequest(s) that contain the topic IDs for all partitions contained in the request.
Protocol Changes
LeaderAndIsr
LeaderAndIsrRequest v3
|
LeaderAndIsrRequest v3 adds the topic ID to the topic_states field, and an enum type to denote the type of LeaderAndIsrRequest. Currently, the first LeaderAndIsrRequest sent to a broker by a controller contains all TopicPartitions that a broker is a replica for. We will formalize this behavior by also including a type enum to denote the type of LeaderAndIsrRequest.
value | enum | description |
---|---|---|
0 | UNSPECIFIED | Unspecified type. Default to incremental. |
1 | INCREMENTAL | A LeaderAndIsrRequest that is not guaranteed to contain all topic partitions assigned to a broker. |
2 | FULL | A full LeaderAndIsrRequest containing all partitions the broker is a replica for. |
When type = FULL, the broker can reconcile its local state on disk with the request. Any partition not contained in this request and present on local disk can be staged for deletion. There are two such types of stale request.
1. The TopicPartition is not present in the LeaderAndIsrRequest.
2. The TopicPartition is contained in the request, but the topic ID that does not match the local topic partition stored on the broker.
Reconciliation may also be necessary if type = INCREMENTAL and the topic ID set on a local partition does not match the topic ID contained in the request.
When type = UNSPECIFIED, the request will be treated in a way that allows for backwards compatibility with older request types.
Deletion of stale partitions triggered by LeaderAndIsrRequest(s) will take place by:
- Logging at WARN level all partitions that will be deleted and the time that they will be be deleted at.
- Move the partition's directory to log.dir/deleting/{topic_id}_{partition}
- Schedule deletion from disk with a delay of delete.stale.topic.delay.ms ms. This will clear the deleting directory of the partition's contents.
LeaderAndIsrResponse v3
|
StopReplica
StopReplicaRequest v2
|
StopReplicaResponse v2
|
Fetch
To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads from deleted topics. Note that the leader epoch is not sufficient for preventing these issues, as the leader epoch is reset when a topic is deleted and recreated.
FetchRequest v12
|
FetchResponse v12
|
ListOffsets
To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads from deleted topics.
ListOffsetsRequest v6
|
ListOffsetsResponse v6
|
OffsetForLeader
To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads from deleted topics.
OffsetForLeaderRequest v4
|
OffsetForLeaderResponse v4
|
DeleteTopics
With the addition of topic IDs and the changes to LeaderAndIsrRequest described above, we can now make changes to topic deletion logic that will allow topics to be immediately considered deleted, regardless of whether all replicas have responded to a DeleteTopicsRequest.
When the contoller receives a DeleteTopicsRequest, if the IBP is >= MIN_TOPIC_ID_VERSION it will delete the /brokers/topics/[topic] znode payload and immediately reply to the DeleteTopicsRequest with a successful response. At this point, the topic is considered deleted, and a topic with the same name can be created.
Although the topic is safely deleted at this point, it must still be garbage collected. To garbage collect, the controller will then send StopReplicaRequest(s) to all brokers assigned as replicas for the deleted topic. For the most part, deletion logic can be maintained between IBP versions, with some differences in responses and cleanup in ZooKeeper. Both formats must still be supported, as the IBP may not be bumped right away and deletes may have already been staged before the IBP bump occurs.
The updated controller's delete logic will:
- Collect deleted topics:
- Old format: /admin/delete_topics pulling the topic state from /brokers/topics/[topic].
- New in-memory topic deletion states from received DeleteTopicsRequest(s)
- Remove deleted topics from replicas by sending StopReplicaRequest V2 for any topics which do not contain a topic ID, and V3 for any topics which do contain a topic ID.
- Finalize successful deletes:
- For /admin/delete_topics deletes, we may need to respond to the TopicDeleteRequest. We can also delete the topic znode at /admin/delete_topics/[topic] and /brokers/topics/[topic].
- For deletes for topics with topic IDs, remove the topic from the in memory topic deletion state on the controller.
- Any unsuccessful StopReplicaRequest(s) will be retried after retryMs, starting from 1) and will be maintained in memory.
This leads to the question of what should be done if the controller never receives a successful response from a replica for a StopReplicaRequest. Under such a scenario it is still safe to stop retrying after a reasonable number of retries and time. Given that LeaderAndIsrRequest v3 includes a type flag, allowing for FULL requests to be identified, any stale partitions will be reconciled and deleted by a broker on startup upon receiving the initial LeaderAndIsrRequest from the a controller. This condition is also safe if the controller changes before the StopReplicaRequest(s) succeed, as the new controller will send a FULL LeaderAndIsrRequest on becoming the leader, ensuring that any stale partitions are cleaned up.
Immediate delete scenarios
Scenario 1:
- Broker B1 is a leader for topic partition A_p0_id0
- Topic A id0 is deleted.
- Topic A id1 is created.
- Broker B1 has not yet received a new LeaderAndIsrRequest, nor a StopReplicaRequest for topic partition A_p0_id0
- Broker B2 has received a LeaderAndIsrRequest for topic partition A_p0 _id0, and starts fetching from B1.
We need the changes to FetchRequest/ListOffsetRequest/OffsetsForLeaderEpochRequest described above to make the above scenario safe. By including the topic ID in these requests, we can prevent a broker from accidentally replicating from a deleted topic with the same name.
Scenario 2:
- Broker B1 is a replica for A_p0_id0.
- Topic A id0 is deleted.
- B1 and has not does not receive a StopReplicaRequest for A_p0_id0.
- Topic A id1 is created.
- Broker B1 receives a LeaderAndIsrRequest containing partition A_p0_id1.
When this occurs, we will close the Log for A_p0_id0, and move A_p0_id0 to the deleting directory as described in the LeaderAndIsrRequest description above.
Storage
Partition Metadata file
To allow brokers to resolve the topic name under this structure, a metadata file will be created at logdir/partitiondir/partition.metadata.
This metadata file will be human readable, and will include:
- Metadata schema version (schema_version: int32)
- Topic ID (id: UUID)
- Topic name (name: String)
- Partition (partition: int32)
This file can either be plain text (key/value pairs) or JSON.
Migration
Upon a controller becoming active, the list of current topics is loaded from /brokers/topics/[topic]. When a topic without a topic ID is found, a UUID will be randomly generated and assigned the topic information at /brokers/topics/[topic] will be updated with the id filled and the schema version bumped to version 3.
LeaderAndIsrRequest(s) will only be sent by the controller once a topic ID has been successfully assigned to the topic. The migration process can take place without an inter-broker protocol bump, as the format stored in /brokers/topics/[topic] will be compatible with older broker versions.
When a replica receives a LeaderAndIsrRequest containing a topic ID for an existing partition which does not have a topic ID associated, it will create a partition metadata file for the topic partition locally. At this point the local partition will have been migrated to support topic IDs.
Configuration
The following configuration options will be added:
Option | Unit | Default | Description |
---|---|---|---|
delete.stale.topic.delay.ms | ms | 14400 (4 hours) | When a FULL LeaderAndIsrRequest is received and the request does not contain a partition that exists on a broker, a deletion event will be staged for that partition which will complete after delete.stale.topic.delay.ms milliseconds. |
org.apache.kafka.common.TopicPartition
At some point it would be useful to modify TopicPartition to include the topic ID. This may be tricky until all APIs support topic IDs.
Compatibility, Deprecation, and Migration Plan
We will need to support all API calls which refer to a partition by either (topicId, partition) or (topicName, partition) until clients are updated to interact with topics by ID. No deprecations are currently planned.
Rejected Alternatives
Sequence ID
As an alternative to a topic UUID, a sequence number (long) could be maintained that is global for the given cluster.
This sequence number could be stored at /topicid/seqid.
Upon topic creation, this sequence number will incremented, and the ID assigned to the created topic. Sequential topic ID generation can use the same approach to broker id generation.
If global uniqueness across clusters is required for topic IDs the first N bits of the ID could consist of a cluster ID prefix, followed by the sequence number. However, to achieve global uniqueness, this would require a large number of bits for the cluster ID prefix.
Use of a UUID has the benefit of being globally unique across clusters without partitioning the ID space by clusterID, and is conceptually simpler.
Topic Deletion
We considered and rejected two other strategies for performing topic deletes.
Best Effort Strategy
Under this stategy, the controller will attempt to send a StopReplicaRequest to all replicas. The controller will give up after a certain number of retries and will complete the delete. Although this will not simplify the topic deletion code, it will prevent delete topic requests from being blocked if one of the replicas is down. This would now be relatively safe, as stale topics will be deleted when a broker receives an initial LeaderAndIsrRequest, however it could prevent space from being reclaimed from a broker that does not respond to a StopReplicaRequest(s) before it is timed out, but is otherwise alive.
Send StopReplicaRequest(s) to online brokers only
In this approach, the controller will send StopReplicaRequests to only the brokers that are online, and will wait for a response from these brokers before marking the delete as successful. This will allow a topic delete to take place while some replicas are offline. If any replicas return to being online, they will receive an initial LeaderAndIsrRequest that will allow them to clear up any stale state. This is similar to the "best effort strategy above".
Removal of Topic Names from Request and Responses
It is unnecessary to include the name of the topic in the following Request/Response calls:
- StopReplica
- Fetch
- ListOffsets
- OffsetForLeader
Including the topic name in the request may make it easier to debug when issues arise, as it will provide more information than the topic ID alone. However, it will also bloat the protocol (especially relevant for FetchRequest), and if they are incorrectly used it may prevent topic renames from being easily implemented in the future.
For the time being we will wish to use the latest protocol versions with clients that do not support topic IDs yet. Until the clients have been updated to refer to partitions by topic ID, we should include both topic name and (optional) ID in every request.
Future Work
Requests
The following requests could be improved by presence of topic IDs, but are out of scope for this KIP.
- CreatePartitionsRequest
- ElectPreferredLeadersRequest
- AlterReplicaLogDirsRequest
- AlterConfigsRequest
- DescribeConfigsRequest
- DescribeLogDirsRequest
- MetadataRequest
- UpdateMetadataRequest
- DeleteRecordsRequest
- ProduceRequest
- AddPartitionsToTxnRequest
- TxnOffsetCommitRequest
- WriteTxnMarkerRequest
Clients
Some of the implemented request types are also relevant to clients. Adding support for topic IDs in the clients would add an additional measure of safety when producing and consuming data.
__consumer_offsets topic
Ideally, consumer offsets stored in the __consumer_offsets topic would be associated with the topic ID for which they were read. However, given the way the __consumer_offsets is compacted, this may be difficult to achieve in a forwards compatible way. This change will be left until topic IDs are implemented in the clients.
log.dir layout
It would be ideal if the log.dir layout could be restructured from {topic}_{partition} format to {{topicIdprefix}}/{topicId}_{partition}, e.g. "mytopic_1" → "24/24cc4332-f7de-45a3-b24e-33d61aa0d16c_1". Note the hierarchical directory structure using the first two characters of the topic ID to avoid having too many directories at the top level of the logdir. This change is not required for the topic deletion improvements above, and will be left for a future KIP where it may be required e.g. topic renames.