Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

OffsetForLeaderEpoch Response (Version: 4) => throttle_time_ms [topics]
  throttle_time_ms => INT32
  topics => topic topic_id* [partitions]
    topic => STRING
    topic_id* => UUID
    partitions => error_code partition leader_epoch end_offset
      error_code => INT16
      partition => INT32
      leader_epoch => INT32
      end_offset => INT64

DeleteTopics

The controller could now be implemented to respond to a DeleteTopicsRequest in one of the following ways:

Existing strategy

In this approach, we would maintain the current delete logic. The controller will send a StopReplicaRequest to all replicas for a topic, and will keep retrying this request until all replicas respond successfully. In this implementation, the deletion logic will not be simplified, and topic deletes will be blocked if any replica is down.

Option 1: best effort strategy

Make a best effort attempt to send a StopReplicaRequest to all replicas. The controller will give up after a certain number of retries and will complete the delete. This will not simplify the topic deletion code, however it will prevent delete topic requests from being blocked if one of the replicas is down. This will now be relatively safe, as stale topics will be deleted when a broker receives an initial LeaderAndIsrRequest, however it could prevent space from being reclaimed from a broker that does not respond to a StopReplicaRequest(s) before it is timed out, but is otherwise alive.

Option 2: send StopReplicaRequest(s) to online brokers only

In this approach, the controller will send StopReplicaRequests to only the brokers that are online, and will wait for a response from these brokers before marking the delete as successful. This will allow a topic delete to take place while some replicas are offline. If any replicas return to being online, they will receive an initial LeaderAndIsrRequest that will allow them to clear up any stale state. This is similar to the "best effort strategy above".

Option 3: immediate effective delete, staged cleanup strategy

This strategy would allow brokers to effectively delete topics immediately, ensuring deletions do not block creation and use of a new topic with the same name. This is the optional proposed by this KIP to remove the current blocking deletion and creation logic and simplify the topic deletion and creation flow. 

...

With the addition of topic IDs, and with the changes to LeaderAndIsrRequest described above, we can now make changes to topic deletion logic that will allow topics to be immediately considered to be deleted, regardless of whether all replicas have responded to a DeleteTopicsRequest.

When the contoller receives a DeleteTopicsRequest, if the IBP is >= MIN_TOPIC_ID_VERSION it will move the /brokers/topics/[topic] znode payload to /admin/delete_topics_by_id/[topicId], and immediately reply to the DeleteTopicsRequest with a successful response. At this point, the topic is considered deleted, and a topic with the same name can be created.

Although the topic is considered safely deleted at this point, it must still be garbage collected. The controller will then send StopReplicaRequest(s) to all brokers assigned as replicas for the deleted topic. Upon successfully receiving a response from all replicas, the znode at /admin/delete_topics_by_id/[topicId] will be deleted.

For the most part, the deletion logic can be maintained between IBP versions, with some differences in responses and cleanup in ZK. Both formats must still be supported, as the IBP may not be bumped right away, and deletes may have already been staged before the IBP bump occurs.

The controller's delete logic will:

  1. Collect deleted topics:
    1. Old format: /admin/delete_topics pulling the topic state from /brokers/topics/[topic].
    2. New format: /admin/delete_topics_by_id

...

    1. . znodes under this path contain the full topic metadata for that topic ID.
  1. After collecting together topics to be deleted in 1, perform deletes by sending StopReplicaRequest V2 for any topics which do not contain a topic ID, and V3 for any topics which do contain a topic ID.
  2. Finalize successful deletes:
    1. For /admin/delete_topics deletes, we may need to respond to the TopicDeleteRequest. We can also delete the topic znode at /admin/delete_topics

...

    1. /[

...

    1. topic]

...

For the most part, the deletion logic can be maintained between IBP versions, with some differences in responses and cleanup in ZK. Both formats must still be supported, as the IBP may not be bumped right away, and deletes may have already been staged before the IBP bump occurs.

The controller's delete logic will:

...

  1. Old format: /admin/delete_topics pulling the topic state from /brokers/topics/[topic].
  2. New format: /admin/delete_topics_by_id. znodes under this path contain the full topic metadata for that topic ID.

...

    1.  and /brokers/topics/[topic].
    2. For /admin/delete_topics_by_id, we can simply delete the znode at /admin/delete_topics_by_id/[topicId].
  1. Failed deletes:
    1. For /admin/delete_topics_by_id, add a backoff to the delete.

...

This leads to the question of what should be done if the controller never receives a successful response from a replica for a StopReplicaRequest. Under such a scenario it is still safe to remove the staged topic deletion from

 /admin/delete_topics_by_id/[

...

topicId], after a reasonable number of retries and time. Given that LeaderAndIsrRequest v3 includes an is_every_partition flag, any stale partitions will be reconciled and deleted by a broker on startup upon receiving the initial LeaderAndIsrRequest. Therefore, it will be safe to remove the staged deletes after a reasonable number of retries, and we can let any brokers which did not receive the request clean these up on their next startup.

Immediate delete scenarios

Scenario 1:

  1. Broker B1 is a leader for topic partition A_p0_id0
  2. Topic A id0 is deleted.
  3. Topic A id1 is created.
  4. Broker B1 has not yet received a new LeaderAndIsrRequest, nor a StopReplicaRequest for topic partition A_p0_id0
  5. Broker B2 has received a LeaderAndIsrRequest for topic partition A_p0 _id0, and starts fetching from B1.

We need the changes to FetchRequest/ListOffsetRequest/OffsetsForLeaderEpochRequest described above to make the above scenario safe. By including the topic ID in these requests, we can prevent a broker from accidentally replicating from a deleted topic with the same name.

Scenario 2:

  1. Broker B1 is a replica for A_p0_id0.

...

  1. For /admin/delete_topics_by_id, add a backoff to the delete.

This leads to the question of what should be done if the controller never receives a successful response from a replica for a StopReplicaRequest. Under such a scenario it is still safe to remove the staged topic deletion from /admin/delete_topics_by_id/[topicId], after a reasonable number of retries and time. Given that LeaderAndIsrRequest v3 includes an is_every_partition flag, any stale partitions will be reconciled and deleted by a broker on startup upon receiving the initial LeaderAndIsrRequest. Therefore, it will be safe to remove the staged deletes after a reasonable number of retries, and we can let any brokers which did not receive the request clean these up on their next startup.

Immediate delete scenarios

Scenario 1:

  1. Broker B1 is a leader for topic partition A_p0_id0
  2. Topic A id0 is deleted.
  3. Topic A id1 is created.
  4. Broker B1 has not yet received a new LeaderAndIsrRequest, nor a StopReplicaRequest for topic partition B1 and has not does not receive a StopReplicaRequest for A_p0_id0.
  5. Topic A id1 is created.
  6. Broker B2 has received a LeaderAndIsrRequest for topic partition B1 receives a LeaderAndIsrRequest containing partition A_p0_id1.

When this occurs, we will close the Log for A_p0_id0, and

...

move A_p0_id0 to the deleting directory as described in the LeaderAndIsrRequest description above.

Should we remove topic name from the protocol where possible?

It is unnecessary to include the name of the topic in the following Request/Response calls:

  • StopReplica
  • Fetch
  • ListOffsets
  • OffsetForLeader

Including the topic name in the request may make it easier to debug when issues arise, as it will provide more information than the topic ID alone. However, it will also bloat the protocol (especially relevant for FetchRequest), and if they are incorrectly used it may prevent topic renames from being easily implemented in the future.

For the time being, we may wish to use the latest protocol versions with clients that do not support topic IDs yet. Until the clients have been updated to refer to partitions by topic ID, we should include both topic name and (optional) ID in every request.

Migration

Upon a controller becoming active, the list of current topics is loaded from /brokers/topics/[topic]. When a topic without a topic ID is found, one will be assigned, and the payload will be rewritten to /brokers/topics/[topic] with id filled with the schema version bumped to version 3. LeaderAndIsrRequest(s) will only be sent by this controller once a topic ID has been successfully assigned to the topic. This process can take place without an inter-broker protocol bump, as the format stored in 

We need the changes to FetchRequest/ListOffsetRequest/OffsetsForLeaderEpochRequest described above to make the above scenario safe. By including the topic ID in these requests, we can prevent a broker from accidentally replicating from a deleted topic with the same name.

Scenario 2:

  1. Broker B1 is a replica for A_p0_id0.
  2. Topic A id0 is deleted.
  3. B1 and has not does not receive a StopReplicaRequest for A_p0_id0.
  4. Topic A id1 is created.
  5. Broker B1 receives a LeaderAndIsrRequest containing partition A_p0_id1.

When this occurs, we will close the Log for A_p0_id0, and move A_p0_id0 to the deleting directory as described in the LeaderAndIsrRequest description above.

Should we remove topic name from the protocol where possible?

It is unnecessary to include the name of the topic in the following Request/Response calls:

  • StopReplica
  • Fetch
  • ListOffsets
  • OffsetForLeader

Including the topic name in the request may make it easier to debug when issues arise, as it will provide more information than the topic ID alone. However, it will also bloat the protocol (especially relevant for FetchRequest), and if they are incorrectly used it may prevent topic renames from being easily implemented in the future.

For the time being, we may wish to use the latest protocol versions with clients that do not support topic IDs yet. Until the clients have been updated to refer to partitions by topic ID, we should include both topic name and (optional) ID in every request.

Migration

Upon a controller becoming active, the list of current topics is loaded from /brokers/topics/[topic]When a topic without a topic ID is found, one will be assigned, and the payload will be rewritten to /brokers/topics/[topic] with id filled with the schema version bumped to version 3. LeaderAndIsrRequest(s) will only be sent by this controller once a topic ID has been successfully assigned to the topic. This process can take place without an inter-broker protocol bump, as the format stored in /brokers/topics/[topic] will be compatible with older broker versions.

When a broker receives a LeaderAndIsrRequest containing a topic ID for an existing partition without an associated topic ID, it will associate the topic ID with the partition. This will effectively migrate a broker's local replicas to include topic IDs.

Configuration

The following configuration options will be added:

...

will be compatible with older broker versions.

When a broker receives a LeaderAndIsrRequest containing a topic ID for an existing partition without an associated topic ID, it will associate the topic ID with the partition. This will effectively migrate a broker's local replicas to include topic IDs.

Configuration

The following configuration options will be added:

OptionUnitDefaultDescription
delete.stale.topic.delay.ms ms14400 (4 hours)When a FULL LeaderAndIsrRequest is received and the request does not contain a partition that exists on a broker, a deletion event will be staged for that partition which will complete after delete.stale.topic.delay.ms milliseconds.

Storage

Partition Metadata file

To allow brokers to resolve the topic name under this structure, a metadata file will be created at logdir/partitiondir/partition.metadata.

This metadata file will be human readable, and will include:

  • Metadata schema version (schema_version: int32)
  • Topic ID (id: UUID)
  • Topic name (name: String)
  • Partition (partition: int32)

This file can either be plain text (key/value pairs) or JSON.

org.apache.kafka.common.TopicPartition

At some point it would be useful to modify TopicPartition to include the topic ID. This may be tricky until all APIs support topic IDs.

Compatibility, Deprecation, and Migration Plan

We will need to support all API calls which refer to a partition by either (topicId, partition) or (topicName, partition) until clients are updated to interact with topics by ID. No deprecations are currently planned.

Rejected Alternatives

Storage

Partition Metadata file

To allow brokers to resolve the topic name under this structure, a metadata file will be created at logdir/partitiondir/partition.metadata.

This metadata file will be human readable, and will include:

  • Metadata schema version (schema_version: int32)
  • Topic ID (id: UUID)
  • Topic name (name: String)
  • Partition (partition: int32)

This file can either be plain text (key/value pairs) or JSON.

org.apache.kafka.common.TopicPartition

At some point it would be useful to modify TopicPartition to include the topic ID. This may be tricky until all APIs support topic IDs.

Compatibility, Deprecation, and Migration Plan

We will need to support all API calls which refer to a partition by either (topicId, partition) or (topicName, partition) until clients are updated to interact with topics by ID. No deprecations are currently planned.

Rejected Alternatives

Sequence ID

As an alternative to a topic UUID, a sequence number (long) could be maintained that is global for the given cluster.

...

If global uniqueness across clusters is required for topic IDs the first N bits of the ID could consist of a cluster ID prefix, followed by the sequence number. However, to achieve global uniqueness, this would require a large number of bits for the cluster ID prefix.Use of a UUID has the benefit of being globally unique across clusters without partitioning the ID space by clusterID, and is conceptually simplerof a cluster ID prefix, followed by the sequence number. However, to achieve global uniqueness, this would require a large number of bits for the cluster ID prefix.

Use of a UUID has the benefit of being globally unique across clusters without partitioning the ID space by clusterID, and is conceptually simpler.

Topic Deletion

We considered and rejected two other strategies for performing topic deletes.

Best Effort Strategy

Under this stategy, the controller will attempt to send a StopReplicaRequest to all replicas. The controller will give up after a certain number of retries and will complete the delete. Although this will not simplify the topic deletion code, it will prevent delete topic requests from being blocked if one of the replicas is down. This would now be relatively safe, as stale topics will be deleted when a broker receives an initial LeaderAndIsrRequest, however it could prevent space from being reclaimed from a broker that does not respond to a StopReplicaRequest(s) before it is timed out, but is otherwise alive.

Send StopReplicaRequest(s) to online brokers only

In this approach, the controller will send StopReplicaRequests to only the brokers that are online, and will wait for a response from these brokers before marking the delete as successful. This will allow a topic delete to take place while some replicas are offline. If any replicas return to being online, they will receive an initial LeaderAndIsrRequest that will allow them to clear up any stale state. This is similar to the "best effort strategy above".

Future Work

Requests

The following requests could be improved by presence of topic IDs, but are out of scope for this KIP.

...