Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

A random UUID will be generated on topic creation or on migration of an existing topic without topic IDs.

Migration

Upon a controller becoming active, the list of current topics is loaded from /brokers/topics/[topic]. When a topic without a topic ID is found, one will be assigned, and the payload will be rewritten to /brokers/topics/[topic] with id filled with the schema version bumped to version 3. LeaderAndIsrRequest(s) will only be sent by this controller once a topic ID has been successfully assigned to the topic. This process can take place without an inter-broker protocol bump, as the format stored in /brokers/topics/[topic] will be compatible with older broker versions.

Protocol Changes

LeaderAndIsr

LeaderAndIsrRequest v3

Protocol Changes

LeaderAndIsr

LeaderAndIsrRequest v3

LeaderAndIsr Request (Version: 3) => controller_id controller_epoch broker_epoch is_every_partition* [topic_states] [live_leaders]
  controller_id =LeaderAndIsr Request (Version: 3) => controller_id controller_epoch broker_epoch is_every_partition* [topic_states] [live_leaders]
  controller_id => INT32
  controller_epoch => INT32
  broker_epoch => INT64
  is_every_partition* => BOOLEAN
  topic_states => topic topic_id* [partition_states]
    topic => STRING
    topic_id* => UUID
    partition_states => partition controller_epoch leader leader_epoch [isr] zk_version [replicas] is_new
      partition => INT32
      controller_epoch => INT32
      leader => INT32
      leader_epoch => INT32
      isr => INT32
      zk_version => INT32
      replicas => INT32
      is_new => BOOLEAN
  live_leaders => id host port
    id => INT32
    host => STRING
    port => INT32

...

Statle partition deletions resulting from LeaderAndIsrRequest(s) will:

  • Log a warning
  • Move the partition's directory to log.dir/deleting/{topic_id}_{partition}
  • A final deletion event will be scheduled

...

  • for delete.stale.topic.delay.ms ms after the LeaderAndIsrRequest was first received. This will clear the deleting directory of the partition's contents

LeaderAndIsrResponse v3

LeaderAndIsr Response (Version: 3) => error_code [partitions]
  error_code => INT16
  partitions => topic topic_id* partition error_code
    topic => STRING
    topic_id* => UUID
    partition => INT32
    error_code => INT16

...

In this approach, we would maintain the current delete logic. The controller will send a StopReplicaRequest to all replicas for a topic, and will keep retrying this request until all replicas respond successfully. In this implementation, the deletion logic . The controller will will not be simplified, and topic deletes will be blocked if any replica is down.

Option 1: best effort strategy

Make a best effort attempt to send a StopReplicaRequest to all replicas for a topic, and will keep retrying this request until all replicas respond successfully. In this implementation, the deletion logic will not be simplified, and topic deletes will be blocked if any replica is down.

Option 1: best effort strategy

Make a best effort attempt to send a StopReplicaRequest to all replicas. The controller will give up after a certain number of retries and will complete the delete. This will not simplify the topic deletion code, however it will prevent delete topic requests from being blocked if one of the replicas is down. This will now be relatively safe, as stale topics will be deleted when a broker receives an initial LeaderAndIsrRequest, however it could prevent space from being reclaimed from a broker that does not respond to a StopReplicaRequest(s) before it is timed out, but is otherwise alive.

Option 2: send StopReplicaRequest(s) to online brokers only

In this approach, the controller will send StopReplicaRequests to only the brokers that are online, and will wait for a response from these brokers before marking the delete as successful. This will allow a topic delete to take place while some replicas are offline. If any replicas return to being online, they will receive an initial LeaderAndIsrRequest that will allow them to clear up any stale state. This is similar to the "best effort strategy above".

Option 3: immediate effective delete, staged cleanup strategy

This strategy would allow brokers to effectively delete topics immediately, ensuring deletions do not block creation and use of a new topic with the same name. This is the optional proposed by this KIP to remove the current blocking deletion and creation logic and simplify the topic deletion and creation flow. 

. The controller will give up after a certain number of retries and will complete the delete. This will not simplify the topic deletion code, however it will prevent delete topic requests from being blocked if one of the replicas is down. This will now be relatively safe, as stale topics will be deleted when a broker receives an initial LeaderAndIsrRequest, however it could prevent space from being reclaimed from a broker that does not respond to a StopReplicaRequest(s) before it is timed out, but is otherwise alive.

Option 2: send StopReplicaRequest(s) to online brokers only

In this approach, the controller will send StopReplicaRequests to only the brokers that are online, and will wait for a response from these brokers before marking the delete as successful. This will allow a topic delete to take place while some replicas are offline. If any replicas return to being online, they will receive an initial LeaderAndIsrRequest that will allow them to clear up any stale state. This is similar to the "best effort strategy above".

Option 3: immediate effective delete, staged cleanup strategy

This strategy would allow brokers to effectively delete topics immediately, ensuring deletions do not block creation and use of a new topic with the same name. This is the optional proposed by this KIP to remove the current blocking deletion and creation logic and simplify the topic deletion and creation flow. 

Upon receiving a DeleteTopicsRequest, if the IBP is >= MIN_TOPIC_ID_VERSION move the /brokers/topics/[topic] znode payload to /admin/delete_topics_by_id/[topicId], and immediately reply to the DeleteTopicsRequest with a successful response. At this point, the topic is considered deleted, and a topic with the same name can be created.

Although the topic is considered safely deleted at this point, it must still be garbage collected. The controller will then send StopReplicaRequest(s) to all brokers assigned as replicas for the deleted topic. Upon successfully receiving a response from all replicas, the znode at Upon receiving a DeleteTopicsRequest, if the IBP is >= MIN_TOPIC_ID_VERSION move the /brokers/topics/[topic] znode payload to /admin/delete_topics_by_id/[topicId]and immediately reply to the DeleteTopicsRequest with a successful response. At this point, the topic is considered deleted, and a topic with the same name can be created.Although the topic is considered safely deleted at this point, it must still be garbage collected. The controller will then send StopReplicaRequest(s) to all brokers assigned as replicas for the deleted topic. Upon successfully receiving a response from all replicas, the znode at  will be deleted.

For the most part, the deletion logic can be maintained between IBP versions, with some differences in responses and cleanup in ZK. Both formats must still be supported, as the IBP may not be bumped right away, and deletes may have already been staged before the IBP bump occurs.

The controller's delete logic will:

  1. Collect deleted topics:
    1. Old format: /admin/delete_topics pulling the topic state from /brokers/topics/[topic].
    2. New format: /admin/delete_topics_by_id

...

    1. . znodes under this path contain the full topic metadata for that topic ID.
  1. After collecting together topics to be deleted in 1, perform deletes by sending StopReplicaRequest V2 for any topics which do not contain a topic ID, and V3 for any topics which do contain a topic ID.
  2. Finalize successful deletes:
    1. For /admin/delete_topics deletes, we may need to respond to the TopicDeleteRequest. We can also delete the topic znode at

For the most part, the deletion logic can be maintained between IBP versions, with some differences in responses and cleanup in ZK. Both formats must still be supported, as the IBP may not be bumped right away, and deletes may have already been staged before the IBP bump occurs.

The controller's delete logic will:

  1. Collect deleted topics:
    1. Old format: /admin/delete_topics pulling the topic state from/[topic] and /brokers/topics/[topic].New format:
    2. For /admin/delete_topics_by_id. znodes under this path contain the full topic metadata for that topic ID.
  2. After collecting together topics to be deleted in 1, perform deletes by sending StopReplicaRequest V2 for any topics which do not contain a topic ID, and V3 for any topics which do contain a topic ID.
    1. topics_by_id, we can simply delete the znode at /admin/delete_topics_by_id/[topicId].
  3. Failed deletes:
    1. For 
    Finalize successful deletes:
    1. For /admin/delete_topics deletes, we may need to respond to the TopicDeleteRequest. We can also delete the topic znode at_by_id, add a backoff to the delete.

This leads to the question of what should be done if the controller never receives a successful response from a replica for a StopReplicaRequest. Under such a scenario it is still safe to remove the staged topic deletion from /admin/delete_topics_by_id/[

...

topicId], after a reasonable number of retries and time. Given that LeaderAndIsrRequest v3 includes an is_every_partition flag, any stale partitions will be reconciled and deleted by a broker on startup upon receiving the initial LeaderAndIsrRequest. Therefore, it will be safe to remove the staged deletes after a reasonable number of retries, and we can let any brokers which did not receive the request clean these up on their next startup.

Immediate delete scenarios

Scenario 1:

  1. Broker B1 is a leader for topic partition A_p0_id0
  2. Topic A id0 is deleted.
  3. Topic A id1 is created.
  4. Broker B1 has not yet received a new LeaderAndIsrRequest, nor a StopReplicaRequest for topic partition A_p0_id0
  5. Broker B2 has received a LeaderAndIsrRequest for topic partition A_p0 _id0, and starts fetching from B1.

We need the changes to FetchRequest/ListOffsetRequest/OffsetsForLeaderEpochRequest described above to make the above scenario safe. By including the topic ID in these requests, we can prevent a broker from accidentally replicating from a deleted topic with the same name.

Scenario 2:

  1. Broker B1 is a replica for A_p0_id0.

...

  1. For /admin/delete_topics_by_id, add a backoff to the delete.

This leads to the question of what should be done if the controller never receives a successful response from a replica for a StopReplicaRequest. Under such a scenario it is still safe to remove the staged topic deletion from /admin/delete_topics_by_id/[topicId], after a reasonable number of retries and time. Given that LeaderAndIsrRequest v3 includes an is_every_partition flag, any stale partitions will be reconciled and deleted by a broker on startup upon receiving the initial LeaderAndIsrRequest. Therefore, it will be safe to remove the staged deletes after a reasonable number of retries, and we can let any brokers which did not receive the request clean these up on their next startup.

Immediate delete scenarios

Scenario 1:

  1. Broker B1 is a leader for topic partition A_p0_id0
  2. Topic A id0 is deleted.
  3. B1 and has not does not receive a StopReplicaRequest for A_p0_id0.
  4. Topic A id1 is created.
  5. Broker B1 has not yet received receives a new LeaderAndIsrRequest, nor a StopReplicaRequest for topic LeaderAndIsrRequest containing partition A_p0_id0Broker B2 has received a LeaderAndIsrRequest for topic partition id1.

When this occurs, we will close the Log for A_p0_id0, and

...

move A_p0_id0 to the deleting directory as described in the LeaderAndIsrRequest description above.

Should we remove topic name from the protocol where possible?

It is unnecessary to include the name of the topic in the following Request/Response calls:

  • StopReplica
  • Fetch
  • ListOffsets
  • OffsetForLeader

Including the topic name in the request may make it easier to debug when issues arise, as it will provide more information than the topic ID alone. However, it will also bloat the protocol (especially relevant for FetchRequest), and if they are incorrectly used it may prevent topic renames from being easily implemented in the future.

For the time being, we may wish to use the latest protocol versions with clients that do not support topic IDs yet. Until the clients have been updated to refer to partitions by topic ID, we should include both topic name and (optional) ID in every request.

Migration

Upon a controller becoming active, the list of current topics is loaded from /brokers/topics/[topic]. When a topic without a topic ID is found, one will be assigned, and the payload will be rewritten to /brokers/topics/[topic] with id filled with the schema version bumped to version 3. LeaderAndIsrRequest(s) will only be sent by this controller once a topic ID has been successfully assigned to the topic. This process can take place without an inter-broker protocol bump, as the format stored in /brokers/topics/[topic] will be compatible with older broker versions.

When a broker receives a LeaderAndIsrRequest containing a topic ID for an existing partition without an associated topic ID, it will associate the topic ID with the partition. This will effectively migrate a broker's local replicas to include topic IDs.


Configuration

The following configuration options will be added:

OptionUnitDefaultDescription
delete.stale.topic.delay.ms ms14400 (4 hours)When a FULL LeaderAndIsrRequest is received and the request does not contain a partition that exists on a broker, a deletion event will be staged for that partition which will complete after delete.stale.topic.delay.ms milliseconds

We need the changes to FetchRequest/ListOffsetRequest/OffsetsForLeaderEpochRequest described above to make the above scenario safe. By including the topic ID in these requests, we can prevent a broker from accidentally replicating from a deleted topic with the same name.

Scenario 2:

  1. Broker B1 is a replica for A_p0_id0.
  2. Topic A id0 is deleted.
  3. B1 and has not does not receive a StopReplicaRequest for A_p0_id0.
  4. Topic A id1 is created.
  5. Broker B1 receives a LeaderAndIsrRequest containing partition A_p0_id1.

When this occurs, we will close the Log for A_p0_id0, and move A_p0_id0 to the deleting directory as described in the LeaderAndIsrRequest description above.

Should we remove topic name from the protocol where possible?

It is unnecessary to include the name of the topic in the following Request/Response calls:

  • StopReplica
  • Fetch
  • ListOffsets
  • OffsetForLeader

Including the topic name in the request may make it easier to debug when issues arise, as it will provide more information than the topic ID alone. However, it will also bloat the protocol (especially relevant for FetchRequest), and if they are incorrectly used it may prevent topic renames from being easily implemented in the future.

For the time being, we may wish to use the latest protocol versions with clients that do not support topic IDs yet. Until the clients have been updated to refer to partitions by topic ID, we should include both topic name and (optional) ID in every request.

Migration

...

.

Storage

Partition Metadata file

...