Table of Contents |
---|
Status
Current state: Under Discussion
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Today, Kafka uniquely identifies a topic by its name. This is generally sufficient, but there are flaws in this scheme if a topic is deleted and recreated with the same name. As a result, Kafka attempts to prevent stale topic issues by ensuring a topic is deleted from all replicas before completing a deletion. This solution is not perfect, as it is possible for partitions to be reassigned from brokers while they are down, and there are no guarantees that this state will ever be cleaned up and will not cause issues in the future.
...
Overall, topic IDs provide a safer way for brokers to replicate topics without any chance of incorrectly interacting with stale topics with the same name. By preventing these situations, we can simplify a number of other interactions such as topic deletes, which are currently more complicated and problematic than necessary.
Public Interfaces
No changes to public interfaces will be made in this release. However, it may be dangerous to use older versions of Kafka tools with new broker versions when using their --zookeeper
flags.
...
A random UUID will be generated on topic creation or on migration of an existing topic without topic IDs.
Protocol Changes
LeaderAndIsr
LeaderAndIsrRequest v3
|
...
When a broker receives a LeaderAndIsrRequest containing a topic ID for an existing partition without an associated topic ID, it will associate the topic ID with the partition. This will effectively migrate a broker's local replicas to include topic IDs.
LeaderAndIsrResponse v3
|
StopReplica
StopReplicaRequest v2
|
StopReplicaResponse v2
|
Fetch
To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads for deleted topics. Note that the leader epoch is not sufficient for preventing these issues, as the leader epoch will be reset after a topic is deleted and recreated.
FetchRequest v12
|
FetchResponse v12
|
ListOffsets
To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads for deleted topics.
ListOffsetsRequest v6
|
ListOffsetsResponse v6
|
OffsetForLeader
To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads for deleted topics.
OffsetForLeaderRequest v4
|
OffsetForLeaderResponse v4
|
DeleteTopics
The controller could now be implemented to respond to a DeleteTopicsRequest in one of the following ways:
Existing strategy
In this approach, we would maintain the current delete logic. The controller will send a StopReplicaRequest to all replicas for a topic, and will keep retrying this request until all replicas respond successfully. In this implementation, the deletion logic will not be simplified, and topic deletes will be blocked if any replica is down.
Option 1: best effort strategy
Make a best effort attempt to send a StopReplicaRequest to all replicas. The controller will give up after a certain number of retries and will complete the delete. This will not simplify the topic deletion code, however it will prevent delete topic requests from being blocked if one of the replicas is down. This will now be relatively safe, as stale topics will be deleted when a broker receives an initial LeaderAndIsrRequest, however it could prevent space from being reclaimed from a broker that does not respond to a StopReplicaRequest(s) before it is timed out, but is otherwise alive.
Option 2: send StopReplicaRequest(s) to online brokers only
In this approach, the controller will send StopReplicaRequests to only the brokers that are online, and will wait for a response from these brokers before marking the delete as successful. This will allow a topic delete to take place while some replicas are offline. If any replicas return to being online, they will receive an initial LeaderAndIsrRequest that will allow them to clear up any stale state. This is similar to the "best effort strategy above".
Option 3: immediate effective delete, staged cleanup strategy
This strategy would allow brokers to effectively delete topics immediately, ensuring deletions do not block creation and use of a new topic with the same name. This is the optional proposed by this KIP, that will clean up the deletion logic and simplify the topic deletion and creation flow.
...
This leads to the question of what should be done if the controller never receives a successful response from a replica for a StopReplicaRequest. Under such a scenario it is still safe to remove the staged topic deletion from /admin/delete_topics_by_id/[topicId], after a reasonable number of retries and time. Given that LeaderAndIsrRequest v3 includes an is_every_partition flag, any stale partitions will be reconciled and deleted by a broker on startup upon receiving the initial LeaderAndIsrRequest. Therefore, it will be safe to remove the staged deletes after a reasonable number of retries, and we can let any brokers which did not receive the request clean these up on their next startup.
Immediate delete scenarios
Scenario 1:
- Broker B1 is a leader for topic partition A_p0_id0
- Topic A id0 is deleted.
- Topic A id1 is created.
- Broker B1 has not yet received a new LeaderAndIsrRequest, nor a StopReplicaRequest for topic partition A_p0_id0
- Broker B2 has received a LeaderAndIsrRequest for topic partition A_p0 _id0, and starts fetching from B1.
...
When this occurs, we will close the Log for A_p0_id0, and move A_p0_id0 to the deleting directory as described in the LeaderAndIsrRequest description above.
Should we remove topic name from the protocol where possible?
It is unnecessary to include the name of the topic in the following Request/Response calls:
...
For the time being, we may wish to use the latest protocol versions with clients that do not support topic IDs yet. Until the clients have been updated to refer to partitions by topic ID, we should include both topic name and (optional) ID in every request.
Storage
Partition Metadata file
To allow brokers to resolve the topic name under this structure, a metadata file will be created at logdir/partitiondir/partition.metadata.
...
This file can either be plain text (key/value pairs) or JSON.
org.apache.kafka.common.TopicPartition
At some point it would be useful to modify TopicPartition to include the topic ID. This may be tricky until all APIs support topic IDs.
Compatibility, Deprecation, and Migration Plan
We will need to support all API calls which refer to a partition by either (topicId, partition) or (topicName, partition) until clients are updated to interact with topics by ID. No deprecations are currently planned.
Rejected Alternatives
Sequence ID
As an alternative to a topic UUID, a sequence number (long) could be maintained that is global for the given cluster.
...
Use of a UUID has the benefit of being globally unique across clusters without partitioning the ID space by clusterID, and is conceptually simpler.
Future Work
Requests
The following requests could be improved by presence of topic IDs, but are out of scope for this KIP.
- CreatePartitionsRequest
- ElectPreferredLeadersRequest
- AlterReplicaLogDirsRequest
- AlterConfigsRequest
- DescribeConfigsRequest
- DescribeLogDirsRequest
- MetadataRequest
- UpdateMetadataRequest
- DeleteRecordsRequest
- ProduceRequest
- AddPartitionsToTxnRequest
- TxnOffsetCommitRequest
- WriteTxnMarkerRequest
Clients
Some of the implemented request types are also relevant to clients. Adding support for topic IDs in the clients would add an additional measure of safety when producing and consuming data.
__consumer_offsets topic
Ideally, consumer offsets stored in the __consumer_offsets topic would be associated with the topic ID for which they were read. However, given the way the __consumer_offsets is compacted, this may be difficult to achieve in a forwards compatible way. This change will be left until topic IDs are implemented in the clients.
log.dir layout
It would be ideal if the log.dir layout could be restructured from {topic}_{partition} format to {{topicIdprefix}}/{topicId}_{partition}, e.g. "mytopic_1" → "24/24cc4332-f7de-45a3-b24e-33d61aa0d16c_1". Note the hierarchical directory structure using the first two characters of the topic ID to avoid having too many directories at the top level of the logdir. This change is not required for the topic deletion improvements above, and will be left for a future KIP where it may be required e.g. topic renames.