Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder Discussion

...

Overall, topic IDs provide a safer way for brokers to replicate topics without any chance of incorrectly interacting with stale topics with the same name. By preventing such scenarios, we can simplify a number of other interactions such as topic deletes which are currently more complicated and problematic than necessary.

Public Interfaces

No Minor changes to public interfaces the TopicDescription interface will be made in this release. However, it may be dangerous to use older versions of Kafka tools with new broker versions when using their --zookeeper flags. Use of older tools in this way is not supported today.

Proposed Changes

Topic IDs will be represented with 64 bit v4 UUIDs. A UUID with all bits as 0 will be reserved as a null UUID as the Kafka RPC protocol does not allow for nullable fields. When printed or stored as a string, topic IDs will be converted to base64 string representation.

to allow clients to access the topic ID of topics found in metadata responses.

/**
* Create an instance with the specified parameters.
*
* @param name The topic name
* @param internal Whether the topic is internal to Kafka
* @param partitions A list of partitions where the index represents the partition id and the element contains
* leadership and replica information for that partition.
* @param authorizedOperations authorized operations for this topic, or null if this is not known.
* @param topicId Unique value that identifies the topic
*
*/
public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions,
Set<AclOperation> authorizedOperations, UUID topicId)

/**
* A unique identifier for the topic.
*/
public UUID topicId()

Additionally, it may be dangerous to use older versions of Kafka tools with new broker versions when using their --zookeeper flags. Use of older tools in this way is not supported today.

Proposed Changes

Topic IDs will be represented with 64 bit v4 UUIDs. A UUID with all bits as 0 will be reserved as a null UUID as the Kafka RPC protocol does not allow for nullable fields. When printed or stored as a string, topic IDs will be converted to base64 string representation.

On On handling a CreateTopicRequest brokers will create the topic znode under /brokers/topics/[topic], as usual.

...

The controller will supply topic IDs for all topic partitions to brokers by sending LeaderAndIsrRequest(s) that contain the topic IDs for all partitions contained in the request.

Requests to describe topics will return a result containing TopicDescriptions with topic IDs for each topic

Protocol Changes

LeaderAndIsr

...

LeaderAndIsr Request (Version: 3) => => controller_id controller_epoch broker_epoch type* [topic_states] [live_leaders]
  controller_id => INT32
  controller_epoch => INT32
  broker_epoch => INT64
  type* => INT8
  topic_states => topic topic_id* [partition_states]
    topic => STRING
    topic_id* => UUID
    partition_states ] [live_leaders]
  controller_id => partition controller_epoch leader leader_epoch [isr] zk_version [replicas] is_new
      partition => INT32
      controller_epoch => INT32
      leader => INT32
  controller      leader_epoch => INT32
      isr => INT32
  broker_epoch       zk_version => INT32
      replicas => INT64
  type* INT32
      is_new => INT8BOOLEAN
  topiclive_states leaders => topic topic_id* [partition_states]
    topic id host port
    id => INT32
    host => STRING
    topic_id* => UUID
    partition_states => partition controller_epoch leader leader_epoch [isr] zk_version [replicas] is_new
      partition => INT32
      controller_epoch => INT32
      leader => INT32
      leader_epoch => INT32
      isr => INT32
      zk_version => INT32
      replicas => INT32
      is_new => BOOLEAN
  live_leaders => id host port
    id => INT32
    host => STRING
    port => INT32

...

port => INT32


LeaderAndIsrRequest v3 adds the topic ID to the topic_states field, and an enum type to denote the type of LeaderAndIsrRequest. Currently, the first LeaderAndIsrRequest sent to a broker by a controller contains all TopicPartitions that a broker is a replica for. We will formalize this behavior by also including a type enum to denote the type of LeaderAndIsrRequest. 

valueenumdescription
0UNSPECIFIEDUnspecified type. Defaults to incremental / previous behavior.
1INCREMENTALA LeaderAndIsrRequest that is not guaranteed to contain all topic partitions assigned to a broker.

2

FULLA full LeaderAndIsrRequest containing all partitions the broker is a replica for.

When type = FULL, the broker is able to reconcile its local state on disk with the request. Any partition not contained in this request and present on local disk can be staged for deletion. There are two such types of stale request.

1. The TopicPartition is not present in the LeaderAndIsrRequest.

2. The TopicPartition is contained in the request, but the topic ID that does not match the local topic partition stored on the broker.

Reconciliation may also be necessary if type = INCREMENTAL and the topic ID set on a local partition does not match the topic ID contained in the request. A TopicPartition with the same name and a different topic ID by implies that the local topic partition is stale, as the topic must have been deleted to create a new topic with a different topic ID.

When type = UNSPECIFIED, the request will be treated in a way that allows for backwards compatibility with older request types.

Deletion

Deletion of stale partitions triggered by LeaderAndIsrRequest(s) will take place by:

  1. Logging at WARN level all partitions that will be deleted and the time that they will be be deleted at.
  2. Move the partition's directory to log.dir/deleting/{topic_id}_{partition}
  3. Schedule deletion from disk with a delay of delete.stale.topic.delay.ms ms. This will clear the deleting directory of the partition's contents.

LeaderAndIsrResponse v3

LeaderAndIsr Response (Version: 3) => error_code [partitions]
  error_code => INT16
  partitions => topic topic_id* partition error_code
    topic => STRING
    topic_id* => UUID
    partition => INT32
    error_code => INT16

StopReplica

StopReplicaRequest v2

StopReplica Request (Version: 2) => controller_id controller_epoch broker_epoch delete_partitions [partitions]
  controller_id => INT32
  controller_epoch => INT32
  broker_epoch => INT64
  delete_partitions => BOOLEAN
  partitions => topic topic_id* [partition_ids]
    topic => STRING
    topic_id* => UUID
    partition_ids => INT32

StopReplicaResponse v2

StopReplica Response (Version: 2

...

2

...

When type = FULL, the broker is able to reconcile its local state on disk with the request. Any partition not contained in this request and present on local disk can be staged for deletion. There are two such types of stale request.

1. The TopicPartition is not present in the LeaderAndIsrRequest.

2. The TopicPartition is contained in the request, but the topic ID that does not match the local topic partition stored on the broker.

Reconciliation may also be necessary if type = INCREMENTAL and the topic ID set on a local partition does not match the topic ID contained in the request. A TopicPartition with the same name and a different topic ID by implies that the local topic partition is stale, as the topic must have been deleted to create a new topic with a different topic ID.

When type = UNSPECIFIED, the request will be treated in a way that allows for backwards compatibility with older request types.

Deletion

Deletion of stale partitions triggered by LeaderAndIsrRequest(s) will take place by:

  1. Logging at WARN level all partitions that will be deleted and the time that they will be be deleted at.
  2. Move the partition's directory to log.dir/deleting/{topic_id}_{partition}
  3. Schedule deletion from disk with a delay of delete.stale.topic.delay.ms ms. This will clear the deleting directory of the partition's contents.

LeaderAndIsrResponse v3

LeaderAndIsr Response (Version: 3) => error_code [partitions]
  error_code => INT16
  partitions => topic topic_id* partition error_code
    topic => STRING
    topic_id* => UUID
    partition => INT32
    error_code => INT16

StopReplica

StopReplicaRequest v2


    error_code => INT16

Fetch

To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads from deleted topics. Note that the leader epoch is not sufficient for preventing these issues, as the partition leader epoch is reset when a topic is deleted and recreated.

FetchRequest v12

Fetch StopReplica Request (Version: 212) => controllerreplica_id max_wait_time min_bytes max_bytes isolation_level session_id controller_epoch broker_epoch delete_partitions [partitions]
  controller_id session_epoch [topics] [forgotten_topics_data] rack_id
  replica_id => INT32
  max_wait_time => INT32
  min_bytes => INT32
  controllermax_epoch bytes => INT32
  broker_epoch isolation_level => INT8
  session_id => INT64INT32
  deletesession_partitions epoch => BOOLEANINT32
  partitions topics => topic topic_id* [partition_idspartitions]
    topic => STRING
    topic_id* => UUID
    partition_ids => INT32

StopReplicaResponse v2

...

StopReplica Response (Version: 2) => error_code [partitions]
  error_code => INT16
  partitions => topic topic_id* partition error_code
    topic => STRING
    topic_id* => UUID
    partition => INT32
    error_code => INT16

_id* => UUID
    partitions => partition current_leader_epoch fetch_offset log_start_offset partition_max_bytes
      partition => INT32
      current_leader_epoch => INT32
      fetch_offset => INT64
      log_start_offset => INT64
      partition_max_bytes => INT32
  forgotten_topics_data => topic [partitions]
    topic => STRING
    partitions => INT32
  rack_id => STRING

FetchResponse v12

Fetch Response

Fetch

To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads from deleted topics. Note that the leader epoch is not sufficient for preventing these issues, as the partition leader epoch is reset when a topic is deleted and recreated.

FetchRequest v12

Fetch Request (Version: 12) => replica_id max_waitthrottle_time min_bytes max_bytes isolation_level ms error_code session_id session_epoch [topics] [forgotten_topics_data] rack_id
  replicaresponses]
  throttle_time_ms => INT32
  error_code => INT16
  session_id => INT32
  max_wait_time => INT32
  min_bytes => INT32
  max_bytes => INT32
  isolation_level => INT8
  session_id => INT32
  session_epoch => INT32
  topics => topic topic_id* [partitions]
    topic => STRING
    topic_id* => UUID
    partitions => partition current_leader_epoch fetch_offset log_start_offset partition_max_bytes
      partition => INT32
      current_leader_epoch => INT32
      fetch_offset => INT64
      log_start_offset => INT64
      partition_max_bytes => INT32
  
  responses => topic topic_id* [partition_responses]
    topic => STRING
    topic_id* => UUID
    partition_responses => partition_header record_set
      partition_header => partition error_code high_watermark last_stable_offset log_start_offset [aborted_transactions] preferred_read_replica
        partition => INT32
        error_code => INT16
        high_watermark => INT64
        last_stable_offset => INT64
        log_start_offset => INT64
        aborted_transactions => producer_id first_offset
          producer_id => INT64
          first_offset => INT64
        preferred_read_replica => INT32
      record_set => RECORDS

ListOffsets

To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads from deleted topics.

ListOffsetsRequest v6

ListOffsets Request (Version: 6) => replica_id isolation_level [topics]
  replica_id => INT32
  isolation_level => INT8
  topics => topic topic_id* forgotten_topics_data => topic [partitions]
    topic => STRING
    topic_id* => UUID
    partitions => partition current_leader_epoch timestamp
      partition => INT32
  rack_id => STRING

FetchResponse v12

      current_leader_epoch => INT32
      timestamp => INT64

ListOffsetsResponse v6

ListOffsets Fetch Response (Version: 126) => throttle_time_ms error_code session_id [responses]
  throttle_time_ms => INT32
  error_code => INT16
  session_id => INT32
  responses => topic topic_id* [partition_responses]
    topic => STRING
    topic_id* => UUID
    partition_responses => partition _header record_seterror_code timestamp offset leader_epoch
      partition _header => partition INT32
      error_code high_watermark last_stable_offset log_start_offset [aborted_transactions] preferred_read_replica
        partition => INT32
        error_code INT16
      timestamp => INT16
        high_watermark INT64
      offset => INT64
        last_stable_offset => INT64
        log_start_offset => INT64
        aborted_transactions => producer_id first_offset
          producer_id => INT64
          first_offset => INT64
        preferred_read_replica => INT32
      record_set => RECORDS

ListOffsets

To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads from deleted topics.

ListOffsetsRequest v6

      leader_epoch => INT32

OffsetForLeader

To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads from deleted topics.

OffsetForLeaderRequest v4

OffsetForLeaderEpoch Request (Version: 4) => replica_id [topics]
  replica_id => INT32
  topics => topic topic_id* [partitions]
    topic => STRING
    topic_id* => UUID
    partitions => partition current_leader_epoch leader_epoch
      partition => INT32
      current_leader_epoch => INT32
      leader_epoch => INT32

OffsetForLeaderResponse v4

OffsetForLeaderEpoch Response (Version: 4) => throttle_time_ms [topics]
  throttle_time_ms => INT32ListOffsets Request (Version: 6) => replica_id isolation_level [topics]
  replica_id => INT32
  isolation_level => INT8
  topics => topic topic_id* [partitions]
    topic => STRING
    topic_id* => UUID
    partitions => error_code partition current_ leader_epoch timestampend_offset
      error_code => INT16
      partition => INT32
      current_leader_epoch => INT32
      timestamp end_offset => INT64

ListOffsetsResponse v6

Metadata

MetadataResponse must be modified so that describeTopics includes the topic id for each topic.

MetadataResponse v10

Metadata

ListOffsets

Response (Version:

6

10) => throttle_time_ms

[responses]
  

[brokers] cluster_id controller_id [topics] cluster_authorized_operations
    throttle_time_ms => INT32

  responses => topic topic_id* [partition_responses]
    topic => STRING
    topic_id* => UUID
    partition_responses => partition error_code timestamp offset leader_epoch
      partition => INT32
      error_code => INT16
      timestamp => INT64
      offset => INT64
      leader_epoch => INT32

OffsetForLeader

To avoid issues where requests are made to stale partitions, a topic_id field will be added to fence reads from deleted topics.

OffsetForLeaderRequest v4

...

OffsetForLeaderEpoch Request (Version: 4) => replica_id [topics]
  replica_id => INT32
  topics => topic topic_id* [partitions]
    topic => STRING
    topic_id* => UUID
    partitions => partition current_leader_epoch leader_epoch
      partition => INT32
      current_leader_epoch => INT32
      leader_epoch => INT32

OffsetForLeaderResponse v4

OffsetForLeaderEpoch Response (Version: 4) => throttle_time_ms [topics]
  throttle_time_ms => INT32
  topics => topic topic_id* [partitions]
    topic => STRING
    

    brokers => node_id host port rack
        node_id => INT32
        host => STRING
        port => INT32
        rack => STRING
    cluster_id => STRING
    controller_id => INT32
    topics => error_code name is_internal [partitions] topic_authorized_operations topic_id*
        error_code => INT16
        name => STRING
        is_internal => BOOL
        partitions => error_code partition_index leader_id leader_epoch replica_nodes isr_nodes offline_replicas
            error_code => INT16
            partition_index => INT32
            leader_id => INT32
            leader_epoch => INT32
            replica_nodes => []INT32
            isr_nodes => []INT32
            offline_replicas => []INT32
        topic_authorized_operations => INT32
        topic_id* => UUID

    partitions => error_code partition leader_epoch end_offset
      error_code => INT16
      partition => INT32
      leader_epoch => INT32
      end_offset => INT64

    cluster_authorized_operations => INT32

DeleteTopics

With the addition of topic IDs and the changes to LeaderAndIsrRequest described above, we can now make changes to topic deletion logic that will allow topics to be immediately considered deleted, regardless of whether all replicas have responded to a DeleteTopicsRequest.

...