Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Requests to describe topics will return a result containing TopicDescriptions with topic IDs for each topic

Protocol Changes

The fields added to requests and responses below will be tagged fields to avoid changing the inter-broker protocol.

LeaderAndIsr

LeaderAndIsrRequest v3

LeaderAndIsr Request (Version: 3) => controller_id controller_epoch broker_epoch type* [topic_states] [live_leaders]
  controller_id => INT32
  controller_epoch => INT32
  broker_epoch => INT64
  type* => INT8 (tagged field)
  topic_states => topic topic_id* [partition_states]
    topic => STRING
    topic_id* => UUID (tagged field)
    partition_states => partition controller_epoch leader leader_epoch [isr] zk_version [replicas] is_new
      partition => INT32
      controller_epoch => INT32
      leader => INT32
      leader_epoch => INT32
      isr => INT32
      zk_version => INT32
      replicas => INT32
      is_new => BOOLEAN
  live_leaders => id host port
    id => INT32
    host => STRING
    port => INT32

...

LeaderAndIsr Response (Version: 3) => error_code [partitions]
  error_code => INT16
  partitions => topic topic_id* partition error_code
    topic => STRING
    topic_id* => UUID (tagged field)
    partition => INT32
    error_code => INT16

...

StopReplica Request (Version: 2) => controller_id controller_epoch broker_epoch delete_partitions [partitions]
  controller_id => INT32
  controller_epoch => INT32
  broker_epoch => INT64
  delete_partitions => BOOLEAN
  partitions => topic topic_id* [partition_ids]
    topic => STRING
    topic_id* => UUIDUUID (tagged field)
    partition_ids => INT32

...

StopReplica Response (Version: 2) => error_code [partitions]
  error_code => INT16
  partitions => topic topic_id* partition error_code
    topic => STRING
    topic_id* => UUIDUUID (tagged field)
    partition => INT32
    error_code => INT16

...

Fetch Request (Version: 12) => replica_id max_wait_time min_bytes max_bytes isolation_level session_id session_epoch [topics] [forgotten_topics_data] rack_id
  replica_id => INT32
  max_wait_time => INT32
  min_bytes => INT32
  max_bytes => INT32
  isolation_level => INT8
  session_id => INT32
  session_epoch => INT32
  topics => topic topic_id* [partitions]
    topic => STRING
    topic_id* => UUIDUUID (tagged field)
    partitions => partition current_leader_epoch fetch_offset log_start_offset partition_max_bytes
      partition => INT32
      current_leader_epoch => INT32
      fetch_offset => INT64
      log_start_offset => INT64
      partition_max_bytes => INT32
  forgotten_topics_data => topic [partitions]
    topic => STRING
    partitions => INT32
  rack_id => STRING

...

Fetch Response (Version: 12) => throttle_time_ms error_code session_id [responses]
  throttle_time_ms => INT32
  error_code => INT16
  session_id => INT32
  responses => topic topic_id* [partition_responses]
    topic => STRING
    topic_id* => UUIDUUID (tagged field)
    partition_responses => partition_header record_set
      partition_header => partition error_code high_watermark last_stable_offset log_start_offset [aborted_transactions] preferred_read_replica
        partition => INT32
        error_code => INT16
        high_watermark => INT64
        last_stable_offset => INT64
        log_start_offset => INT64
        aborted_transactions => producer_id first_offset
          producer_id => INT64
          first_offset => INT64
        preferred_read_replica => INT32
      record_set => RECORDS

...

ListOffsets Request (Version: 6) => replica_id isolation_level [topics]
  replica_id => INT32
  isolation_level => INT8
  topics => topic topic_id* [partitions]
    topic => STRING
    topic_id* => UUIDUUID (tagged field)
    partitions => partition current_leader_epoch timestamp
      partition => INT32
      current_leader_epoch => INT32
      timestamp => INT64

...

ListOffsets Response (Version: 6) => throttle_time_ms [responses]
  throttle_time_ms => INT32
  responses => topic topic_id* [partition_responses]
    topic => STRING
    topic_id* => UUIDUUID (tagged field)
    partition_responses => partition error_code timestamp offset leader_epoch
      partition => INT32
      error_code => INT16
      timestamp => INT64
      offset => INT64
      leader_epoch => INT32

...

OffsetForLeaderEpoch Request (Version: 4) => replica_id [topics]
  replica_id => INT32
  topics => topic topic_id* [partitions]
    topic => STRING
    topic_id* => UUIDUUID (tagged field)
    partitions => partition current_leader_epoch leader_epoch
      partition => INT32
      current_leader_epoch => INT32
      leader_epoch => INT32

...

OffsetForLeaderEpoch Response (Version: 4) => throttle_time_ms [topics]
  throttle_time_ms => INT32
  topics => topic topic_id* [partitions]
    topic => STRING
    topic_id* => UUIDUUID (tagged field)
    partitions => error_code partition leader_epoch end_offset
      error_code => INT16
      partition => INT32
      leader_epoch => INT32
      end_offset => INT64

...

MetadataResponse must be modified so that describeTopics includes the topic id for each topic.

MetadataResponse v10

Metadata Response (Version: 10) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations
    throttle_time_ms => INT32
    brokers => node_id host port rack
        node_id => INT32
        host => STRING
        port => INT32
        rack => STRING
    cluster_id => STRING
    controller_id => INT32
    topics => error_code name is_internal [partitions] topic_authorized_operations topic_id*
        error_code => INT16
        name => STRING
        is_internal => BOOL
        partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas]
            error_code => INT16
            partition_index => INT32
            leader_id => INT32
            leader_epoch => INT32
            replica_nodes => INT32
            isr_nodes => INT32
            offline_replicas => INT32
        topic_authorized_operations => INT32
        topic_id* =>

UUID

UUID (tagged field)
    cluster_authorized_operations => INT32

UpdateMetadata

UpdateMetadata should also include the topic ID.

UpdateMetadataRequest v7

UpdateMetadata Request (Version: 7) => controller_id controller_epoch broker_epoch [ungrouped_partition_states] [topic_states] [live_brokers] 
    controller_id => INT32
    controller_epoch => INT32
    broker_epoch => INT64
    ungrouped_partition_states => UpdateMetadataPartitionState
    topic_states => topic_name topic_id* [partition_states]
        topic_name => STRING
        topic_id* => UUID (tagged field)
        partition_states => UpdateMetadataPartitionState
    live_brokers => id v0_host v0_port [endpoints] rack
        id => INT32
        v0_host => STRING
        v0_port => INT32
        endpoints => port host listener security_protocol
            port => INT32
            host => STRING
            listener => STRING
            security_protocol => INT16
        rack => STRING


DeleteTopics

With the addition of topic IDs and the changes to LeaderAndIsrRequest described above, we can now make changes to topic deletion logic that will allow topics to be immediately considered deleted, regardless of whether all replicas have responded to a DeleteTopicsRequest.

...

LeaderAndIsrRequest(s) will only be sent by the controller once a topic ID has been successfully assigned to the topic. The migration process can take place without an inter-broker protocol bump, as the format stored in /brokers/topics/[topic] will be compatible with older broker versions. The topic ids in requests and responses will be tagged fields to avoid changing the inter-broker protocol.

When a replica receives a LeaderAndIsrRequest containing a topic ID for an existing partition which does not have a topic ID associated, it will create a partition metadata file for the topic partition locally. At this point the local partition will have been migrated to support topic IDs.

...