You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 15 Next »

Status

Current stateUnder Discussion

Discussion thread: https://lists.apache.org/thread.html/7efa8cd169cadc7dc9cf86a7c0dbbab1836ddb5024d310fcebacf80c@%3Cdev.kafka.apache.org%3E

JIRA Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Today, Kafka uniquely identifies a topic by its name. This is generally sufficient, but there are flaws in this scheme if a topic is deleted and recreated with the same name. As a result, Kafka attempts to prevent stale topic issues by ensuring a topic is deleted from all replicas before completing a deletion. This solution is not perfect, as it is possible for partitions to be reassigned from brokers while they are down, and there are no guarantees that this state will ever be cleaned up and will not cause issues in the future.

As the controller must wait for all replicas to delete their local replicas, deletes can also become blocked, preventing topics from being created with the same name until the deletion is complete on all replicas. This can mean that downtime for a single broker can effectively cause a complete outage for everyone producing/consuming to that topic name, as the topic cannot be recreated without manual intervention.

Topic IDs aim to address this issue by associating a truly unique ID with each topic, ensuring a newly created topic with a previously used name cannot be confused with a previous topic with that name.

Topic IDs solve a few other problems:

  1. Renaming topics becomes feasible (although there may still be some complexity with the need to support the old name for a while as part of migration, etc.)  Renaming topics may seem minor, but it will be difficult to have hierarchical topics without having some kind of renaming support.
  2. We can eventually get rid of the "deleting" state for topics. If a broker is down but there is some topic data there that is no longer relevant, it won't cause problems later on. It can be deleted when the broker rejoins the cluster and realizes that the relevant topic ID is not present any more. We gain some additional safety where stale/deleted replicas may currently interact with live ones.
  3. Sending 16 byte UUIDs instead of Strings over Kafka RPCs can be smaller. A string is 2 bytes plus the data, whereas the UUID is fixed 16 bytes.  For any topic name with more than than 14 single byte characters (16 bytes serialized), UUIDs will be smaller. They will also be faster to compare and more friendly to the garbage collector.
  4. They will provide a true measure of topic uniqueness across clusters. This may be important in multi-cluster Kafka deployments where additional safety and debuggability is desired.

Overall, topic IDs provide a safer way for brokers to interact with respect to a topic without any chance of incorrectly interacting with a stale topic with the same name. By preventing these situations, we can simplify a number of other interactions, e.g. topic deletes, that are currently more complicated and problematic than necessary due to the need to avoid these issues.

Public Interfaces

No changes to public interfaces will be made in this release. However, it may be dangerous to use older versions of Kafka tools with new broker versions when using their --zookeeper flags.

Proposed Changes

On handling a CreateTopicRequest, the broker will create the topic znode under /brokers/topics/[topic], as usual.

The znode value will now contain the ID for the topic in the "id" field, and the schema version will be bumped to version 2.

Schema:
{ "fields" :
    [ {"name": "version", "type": "int", "id": "UUID", "doc": "version id"},
      {"name": "partitions",
       "type": {"type": "map",
                "values": {"type": "array", "items": "int", "doc": "a list of replica ids"},
                "doc": "a map from partition id to replica list"},
      }
    ]
}
 
Example:
{
  "version": 2,
  "id": "24cc4332-f7de-45a3-b24e-33d61aa0d16c",
  "partitions": {"0": [0, 1, 3] }
}

The controller will maintain local in-memory state containing a mapping from topic name to topic ID. On controller startup, the topic ID will automatically be loaded into memory along with the topics and partitions.

The controller will supply topic IDs for all topic partitions to brokers by sending LeaderAndIsrRequest(s) that contain the topic IDs for all partitions contained in the request.

A random UUID will be generated on topic creation or on migration of an existing topic without topic IDs.

Protocol Changes

LeaderAndIsr

LeaderAndIsrRequest v3

LeaderAndIsr Request (Version: 3) => controller_id controller_epoch broker_epoch is_every_partition* [topic_states] [live_leaders]
  controller_id => INT32
  controller_epoch => INT32
  broker_epoch => INT64
  is_every_partition* => BOOLEAN
  topic_states => topic topic_id* [partition_states]
    topic => STRING
    topic_id* => UUID
    partition_states => partition controller_epoch leader leader_epoch [isr] zk_version [replicas] is_new
      partition => INT32
      controller_epoch => INT32
      leader => INT32
      leader_epoch => INT32
      isr => INT32
      zk_version => INT32
      replicas => INT32
      is_new => BOOLEAN
  live_leaders => id host port
    id => INT32
    host => STRING
    port => INT32


LeaderAndIsrRequest v3 adds the topic ID to the topic_states field, and a boolean flag to denote whether this is the initial "full" LeaderAndIsrRequest sent by a controller.

Currently, the first LeaderAndIsrRequest sent to a broker by a controller contains all TopicPartitions that a broker is a replica for. We will formalize this behavior by also including an is_every_partition flag to denote when this is one such request. When is_every_partition = true, the broker can reconcile its local state on disk with the request, and safely stage deletions for any partitions that are present on disk and are not contained in the request. This may include cases where a TopicPartition is not present in the LeaderAndIsrRequest, or it may be due to a topic partition containing a topic ID that does not match the local topic partition stored on the broker. Such reconciliation may also be necessary if is_every_partition = false, and the topic ID set on a partition does not match the ID contained in the request.

Deletions will be staged by logging a warning and then moving the partition directory to deleting directory under {topic_id}_{partition}. A deletion completion event will be scheduled for X ms after the LeaderAndIsrRequest was first received.

When a broker receives a LeaderAndIsrRequest containing a topic ID for an existing partition without an associated topic ID, it will associate the topic ID with the partition. This will effectively migrate a broker's local replicas to include topic IDs.

LeaderAndIsrResponse v3

LeaderAndIsr Response (Version: 3) => error_code [partitions]
  error_code => INT16
  partitions => topic topic_id* partition error_code
    topic => STRING
    topic_id* => UUID
    partition => INT32
    error_code => INT16

StopReplica

StopReplicaRequest v2

StopReplica Request (Version: 2) => controller_id controller_epoch broker_epoch delete_partitions [partitions]
  controller_id => INT32
  controller_epoch => INT32
  broker_epoch => INT64
  delete_partitions => BOOLEAN
  partitions => topic topic_id* [partition_ids]
    topic => STRING
    topic_id* => UUID
    partition_ids => INT32

StopReplicaResponse v2

StopReplica Response (Version: 2) => error_code [partitions]
  error_code => INT16
  partitions => topic topic_id* partition error_code
    topic => STRING
    topic_id* => UUID
    partition => INT32
    error_code => INT16

Fetch

To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads for deleted topics.

FetchRequest v12

Fetch Request (Version: 12) => replica_id max_wait_time min_bytes max_bytes isolation_level session_id session_epoch [topics] [forgotten_topics_data] rack_id
  replica_id => INT32
  max_wait_time => INT32
  min_bytes => INT32
  max_bytes => INT32
  isolation_level => INT8
  session_id => INT32
  session_epoch => INT32
  topics => topic topic_id* [partitions]
    topic => STRING
    topic_id* => UUID
    partitions => partition current_leader_epoch fetch_offset log_start_offset partition_max_bytes
      partition => INT32
      current_leader_epoch => INT32
      fetch_offset => INT64
      log_start_offset => INT64
      partition_max_bytes => INT32
  forgotten_topics_data => topic [partitions]
    topic => STRING
    partitions => INT32
  rack_id => STRING

FetchResponse v12

Fetch Response (Version: 12) => throttle_time_ms error_code session_id [responses]
  throttle_time_ms => INT32
  error_code => INT16
  session_id => INT32
  responses => topic topic_id* [partition_responses]
    topic => STRING
    topic_id* => UUID
    partition_responses => partition_header record_set
      partition_header => partition error_code high_watermark last_stable_offset log_start_offset [aborted_transactions] preferred_read_replica
        partition => INT32
        error_code => INT16
        high_watermark => INT64
        last_stable_offset => INT64
        log_start_offset => INT64
        aborted_transactions => producer_id first_offset
          producer_id => INT64
          first_offset => INT64
        preferred_read_replica => INT32
      record_set => RECORDS

ListOffsets

To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads for deleted topics.

ListOffsetsRequest v6

ListOffsets Request (Version: 6) => replica_id isolation_level [topics]
  replica_id => INT32
  isolation_level => INT8
  topics => topic topic_id* [partitions]
    topic => STRING
    topic_id* => UUID
    partitions => partition current_leader_epoch timestamp
      partition => INT32
      current_leader_epoch => INT32
      timestamp => INT64

ListOffsetsResponse v6

ListOffsets Response (Version: 6) => throttle_time_ms [responses]
  throttle_time_ms => INT32
  responses => topic topic_id* [partition_responses]
    topic => STRING
    topic_id* => UUID
    partition_responses => partition error_code timestamp offset leader_epoch
      partition => INT32
      error_code => INT16
      timestamp => INT64
      offset => INT64
      leader_epoch => INT32

OffsetForLeader

To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads for deleted topics.

OffsetForLeaderRequest v4

OffsetForLeaderEpoch Request (Version: 4) => replica_id [topics]
  replica_id => INT32
  topics => topic topic_id* [partitions]
    topic => STRING
    topic_id* => UUID
    partitions => partition current_leader_epoch leader_epoch
      partition => INT32
      current_leader_epoch => INT32
      leader_epoch => INT32

OffsetForLeaderResponse v4

OffsetForLeaderEpoch Response (Version: 4) => throttle_time_ms [topics]
  throttle_time_ms => INT32
  topics => topic topic_id* [partitions]
    topic => STRING
    topic_id* => UUID
    partitions => error_code partition leader_epoch end_offset
      error_code => INT16
      partition => INT32
      leader_epoch => INT32
      end_offset => INT64

DeleteTopics

The controller could now be implemented to respond to a DeleteTopicsRequest in one of the following ways:

Existing strategy

In this approach, we would maintain the current delete logic. The controller will send a StopReplicaRequest to all replicas for a topic, and will keep retrying this request until all replicas respond successfully. In this implementation, the deletion logic will not be simplified.

Option 1: best effort strategy

Make a best effort attempt to send a StopReplicaRequest to all replicas. The controller will give up after a certain number of retries and will complete the delete. This will not simplify the topic deletion code, however it will prevent delete topic requests from being blocked if one of the replicas is down. This will now be relatively safe, as stale topics will be deleted when a broker receives an initial LeaderAndIsrRequest, however it could prevent space from being reclaimed from a broker that does not respond to a StopReplicaRequest(s) before it is timed out, but is otherwise alive.

Option 2: send StopReplicaRequest(s) to online brokers only

In this approach, the controller will send StopReplicaRequests to only the brokers that are online, and will wait for a response from these brokers before marking the delete as successful. This will allow a topic delete to take place while some replicas are offline. If any replicas return to being online, they will receive an initial LeaderAndIsrRequest that will allow them to clear up any stale state. This is similar to the "best effort strategy above".

Option 3: immediate effective delete, staged cleanup strategy

This strategy would allow brokers to effectively delete topics immediately, ensuring deletions do not block creation and use of a new topic with the same name. This is the proposed option to clean up the deletion logic and would not block topic recreation.

Upon receiving a DeleteTopicsRequest, if the IBP is >= MIN_TOPIC_ID_VERSION move the /brokers/topics/[topic] znode payload to /admin/delete_topics_by_id/[topicId], and immediately reply to the DeleteTopicsRequest with a successful response. At this point, the topic is considered deleted, and a topic with the same name can be created.

Although the topic is considered safely deleted at this point, it must still be garbage collected. The controller will then send StopReplicaRequest(s) to all brokers assigned as replicas for the deleted topic. Upon successfully receiving a response from all replicas, the znode at /admin/delete_topics_by_id/[topicId] will be deleted.

For the most part, the deletion logic can be maintained between IBP versions, with some differences in responses and cleanup in ZK. Both formats must still be supported, as the IBP may not be bumped right away, and deletes may have already been staged before the IBP bump occurs.

The controller's delete logic will:

  1. Collect deleted topics:
    1. Old format: /admin/delete_topics pulling the topic state from /brokers/topics/[topic].
    2. New format: /admin/delete_topics_by_id. znodes under this path contain the full topic metadata for that topic ID.
  2. After collecting together topics to be deleted in 1, perform deletes by sending StopReplicaRequest V2 for any topics which do not contain a topic ID, and V3 for any topics which do contain a topic ID.
  3. Finalize successful deletes:
    1. For /admin/delete_topics deletes, we may need to respond to the TopicDeleteRequest. We can also delete the topic znode at /admin/delete_topics/[topic] and /brokers/topics/[topic].
    2. For /admin/delete_topics_by_id, we can simply delete the znode at /admin/delete_topics_by_id/[topicId].
  4. Failed deletes:
    1. For /admin/delete_topics_by_id, add a backoff to the delete.

This leads to the question of what should be done if the controller never receives a successful response from a replica for a StopReplicaRequest. Under such a scenario it is still safe to remove the staged topic deletion from /admin/delete_topics_by_id/[topicId], after a reasonable number of retries and time. Given that LeaderAndIsrRequest v3 includes an is_every_partition flag, any stale partitions will be reconciled and deleted by a broker on startup upon receiving the initial LeaderAndIsrRequest. Therefore, it will be safe to remove the staged deletes after a reasonable number of retries, and we can let any brokers which did not receive the request clean these up on their next startup.

Immediate delete scenarios

Scenario 1:

  1. Broker B1 is a leader for topic partition A_p0_id0
  2. Topic A id0 is deleted.
  3. Topic A id1 is created.
  4. Broker B1 has not yet received a new LeaderAndIsrRequest, nor a StopReplicaRequest for topic partition A_p0_id0
  5. Broker B2 has received a LeaderAndIsrRequest for topic partition A_p0 _id0, and starts fetching from B1.

We need the changes to FetchRequest/ListOffsetRequest/OffsetsForLeaderEpochRequest described above to make the above scenario safe. By including the topic ID in these requests, we can prevent a broker from accidentally replicating a stale version of the topic.

Scenario 2:

  1. Broker B1 is a replica for A_p0_id0.
  2. Topic A id0 is deleted.
  3. B1 and has not does not receive a StopReplicaRequest for A_p0_id0.
  4. Topic A id1 is created.
  5. Broker B1 receives a LeaderAndIsrRequest containing partition A_p0_id1.

When this occurs, we will close the Log for A_p0_id0, and move A_p0_id0 to the deleting directory as described in the LeaderAndIsrRequest description above.

Should we remove topic name from the protocol where possible?

It is unnecessary to include the name of the topic in the following Request/Response calls:

  • StopReplica
  • Fetch
  • ListOffsets
  • OffsetForLeader

Including the topic name in the request may make it easier to debug when issues arise, as it will provide more information than the topic ID alone. However, it will also bloat the protocol (especially relevant for FetchRequest), and if they are incorrectly used it may prevent topic renames from being easily implemented in the future.

For the time being, we may wish to use the latest protocol versions with clients that do not support topic IDs yet. Until the clients have been updated to refer to partitions by topic ID, we should include both topic name and (optional) ID in every request.

Storage

Partition Metadata file

To allow brokers to resolve the topic name under this structure, a metadata file will be created at logdir/partitiondir/partition.metadata.

This metadata file will be human readable, and will include:

  • Metadata schema version (schema_version: int32)
  • Topic ID (id: UUID)
  • Topic name (name: String)
  • Partition (partition: int32)

This file can either be plain text (key/value pairs) or JSON.

org.apache.kafka.common.TopicPartition

At some point it would be useful to modify TopicPartition to include the topic ID. This may be tricky until all APIs support topic IDs.

Compatibility, Deprecation, and Migration Plan

We will need to support all API calls which refer to a partition by either (topicId, partition) or (topicName, partition) until clients are updated to interact with topics by ID. No deprecations are currently planned.

Rejected Alternatives

Sequence ID

As an alternative to a topic UUID, a sequence number (long) could be maintained that is global for the given cluster.

This sequence number could be stored at /topicid/seqid.

Upon topic creation, this sequence number will incremented, and the ID assigned to the created topic. Sequential topic ID generation can use the same approach to broker id generation.

If global uniqueness across clusters is required for topic IDs the first N bits of the ID could consist of a cluster ID prefix, followed by the sequence number. However, to achieve global uniqueness, this would require a large number of bits for the cluster ID prefix.

Use of a UUID has the benefit of being globally unique across clusters without partitioning the ID space by clusterID, and is conceptually simpler.

Future Work

Requests

The following requests could be improved by presence of topic IDs, but are out of scope for this KIP.

  • CreatePartitionsRequest
  • ElectPreferredLeadersRequest
  • AlterReplicaLogDirsRequest
  • AlterConfigsRequest
  • DescribeConfigsRequest
  • DescribeLogDirsRequest
  • MetadataRequest
  • UpdateMetadataRequest
  • DeleteRecordsRequest
  • ProduceRequest
  • AddPartitionsToTxnRequest
  • TxnOffsetCommitRequest
  • WriteTxnMarkerRequest

Clients

Some of the implemented request types are also relevant to clients. Adding support for topic IDs in the clients would add an additional measure of safety when producing and consuming data.

__consumer_offsets topic

Ideally, consumer offsets stored in the __consumer_offsets topic would be associated with the topic ID for which they were read. However, given the way the __consumer_offsets is compacted, this may be difficult to achieve in a forwards compatible way. This change will be left until topic IDs are implemented in the clients.

log.dir layout

It would be ideal if the log.dir layout could be restructured from {topic}_{partition} format to {{topicIdprefix}}/{topicId}_{partition}, e.g. "mytopic_1" → "24/24cc4332-f7de-45a3-b24e-33d61aa0d16c_1". Note the hierarchical directory structure using the first two characters of the topic ID to avoid having too many directories at the top level of the logdir. This change is not required for the topic deletion improvements above, and will be left for a future KIP where it may be required e.g. topic renames.

  • No labels