Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Requests to describe topics will return a result containing TopicDescriptions with topic IDs for each topic

Protocol Changes

The fields added to requests and responses below will be tagged fields to avoid changing the inter-broker protocol.

LeaderAndIsr

LeaderAndIsrRequest v5

LeaderAndIsr Request (Version: 5) => controller_id controller_epoch broker_epoch type* [topic_states] [live_leaders]
  controller_id => INT32
  controller_epoch => INT32
  broker_epoch => INT64
  type* => INT8 (tagged field)INT8 
  topic_states => topic topic_id* [partition_states]
    topic => STRING
    topic_id* => UUID (tagged field)UUID 
    partition_states => partition controller_epoch leader leader_epoch [isr] zk_version [replicas] is_new
      partition => INT32
      controller_epoch => INT32
      leader => INT32
      leader_epoch => INT32
      isr => INT32
      zk_version => INT32
      replicas => INT32
      is_new => BOOLEAN
  live_leaders => id host port
    id => INT32
    host => STRING
    port => INT32


LeaderAndIsrRequest v4 adds v5 adds the topic ID to the topic_states field, and an enum type to denote the type of LeaderAndIsrRequest. Currently, the first LeaderAndIsrRequest sent to a broker by a controller contains all TopicPartitions that a broker is a replica for. We will formalize this behavior by also including a type enum to denote the type of LeaderAndIsrRequest. 

...

LeaderAndIsr Response (Version: 5) => error_code [partitions]
  error_code => INT16
  partitions => topic topic_id* partition error_code
    topic_id* => UUID (tagged field)UUID 
    partition => INT32
    error_code => INT16

The topic name field has been removed.

StopReplica

StopReplicaRequest

...

v4

StopReplica Request (Version: 24) => controller_id controller_epoch broker_epoch delete_partitions [partitions]
  controller_id => INT32
  controller_epoch => INT32
  broker_epoch => INT64
  delete_partitions => BOOLEAN
  partitions => topic topic_id* [partition_ids]
    topic => STRING
    topic_id* => UUID (tagged field)
    partition_ids => INT32

StopReplicaResponse

...

v4

StopReplica Response (Version: 24) => error_code [partitions]
  error_code => INT16
  partitions => topic topic_id* partition error_code
    topic => STRING
    topic_id* => UUID (tagged field)
    partition => INT32
    error_code => INT16

...

Fetch Request (Version: 12) => replica_id max_wait_time min_bytes max_bytes isolation_level session_id session_epoch [topics] [forgotten_topics_data] rack_id
  replica_id => INT32
  max_wait_time => INT32
  min_bytes => INT32
  max_bytes => INT32
  isolation_level => INT8
  session_id => INT32
  session_epoch => INT32
  topics => topic topic_id* [partitions]
    topic => STRING
    topic_id* => UUID (tagged field)
    partitions => partition current_leader_epoch fetch_offset log_start_offset partition_max_bytes
      partition => INT32
      current_leader_epoch => INT32
      fetch_offset => INT64
      log_start_offset => INT64
      partition_max_bytes => INT32
  forgotten_topics_data => topic [partitions]
    topic => STRING
    partitions => INT32
  rack_id => STRING

...

Fetch Response (Version: 12) => throttle_time_ms error_code session_id [responses]
  throttle_time_ms => INT32
  error_code => INT16
  session_id => INT32
  responses => topic topic_id* [partition_responses]
    topic => STRING
    topic_id* => UUID (tagged field)
    partition_responses => partition_header record_set
      partition_header => partition error_code high_watermark last_stable_offset log_start_offset [aborted_transactions] preferred_read_replica
        partition => INT32
        error_code => INT16
        high_watermark => INT64
        last_stable_offset => INT64
        log_start_offset => INT64
        aborted_transactions => producer_id first_offset
          producer_id => INT64
          first_offset => INT64
        preferred_read_replica => INT32
      record_set => RECORDS

...

ListOffset Request (Version: 6) => replica_id isolation_level [topics]
  replica_id => INT32
  isolation_level => INT8
  topics => topic topic_id* [partitions]
    topic => STRING
    topic_id* => UUID (tagged field)UUID
    partitions => partition current_leader_epoch timestamp
      partition => INT32
      current_leader_epoch => INT32
      timestamp => INT64

...

ListOffset Response (Version: 6) => throttle_time_ms [responses]
  throttle_time_ms => INT32
  responses => topic topic_id* [partition_responses]
    topic => STRING
    topic_id* => UUID (tagged field)UUID
    partition_responses => partition error_code timestamp offset leader_epoch
      partition => INT32
      error_code => INT16
      timestamp => INT64
      offset => INT64
      leader_epoch => INT32

...

OffsetForLeaderEpoch Request (Version: 4) => replica_id [topics]
  replica_id => INT32
  topics => topic topic_id* [partitions]
    topic => STRING
    topic_id* => UUID (tagged field)
    partitions => partition current_leader_epoch leader_epoch
      partition => INT32
      current_leader_epoch => INT32
      leader_epoch => INT32

...

OffsetForLeaderEpoch Response (Version: 4) => throttle_time_ms [topics]
  throttle_time_ms => INT32
  topics => topic topic_id* [partitions]
    topic => STRING
    topic_id* => UUID (tagged field)
    partitions => error_code partition leader_epoch end_offset
      error_code => INT16
      partition => INT32
      leader_epoch => INT32
      end_offset => INT64

...

MetadataResponse must be modified so that describeTopics includes the topic id for each topic.

MetadataResponse

...

v10

Metadata Response (Version: 910) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations
    throttle_time_ms => INT32
    brokers => node_id host port rack
        node_id => INT32
        host => STRING
        port => INT32
        rack => STRING
    cluster_id => STRING
    controller_id => INT32
    topics => error_code name is_internal [partitions] topic_authorized_operations topic_id*
        error_code => INT16
        name => STRING
        is_internal => BOOL
        partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas]
            error_code => INT16
            partition_index => INT32
            leader_id => INT32
            leader_epoch => INT32
            replica_nodes => INT32
            isr_nodes => INT32
            offline_replicas => INT32
        topic_authorized_operations => INT32
        topic_id* => UUID (tagged field)UUID
    cluster_authorized_operations => INT32

...

UpdateMetadata should also include the topic ID.

UpdateMetadataRequest

...

v7

UpdateMetadata Request (Version: 67) => controller_id controller_epoch broker_epoch [ungrouped_partition_states] [topic_states] [live_brokers] 
    controller_id => INT32
    controller_epoch => INT32
    broker_epoch => INT64
    ungrouped_partition_states => UpdateMetadataPartitionState
    topic_states => topic_name topic_id* [partition_states]
        topic_name => STRING
        topic_id* => UUID (tagged field)
        partition_states => UpdateMetadataPartitionState
    live_brokers => id v0_host v0_port [endpoints] rack
        id => INT32
        v0_host => STRING
        v0_port => INT32
        endpoints => port host listener security_protocol
            port => INT32
            host => STRING
            listener => STRING
            security_protocol => INT16
        rack => STRING


...

  1. Collect deleted topics:
    1. Old format: /admin/delete_topics pulling the topic state from /brokers/topics/[topic].
    2. New in-memory topic deletion states from received DeleteTopicsRequest(s) 
  2. Remove deleted topics from replicas by sending StopReplicaRequest V1 for V3 for any topics which do not contain a topic ID, and V2 V4 for any topics which do contain a topic ID.
  3. Finalize successful deletes:
    1. For /admin/delete_topics deletes, we may need to respond to the TopicDeleteRequest. We can also delete the topic znode at /admin/delete_topics/[topic] and /brokers/topics/[topic].
    2. For deletes for topics with topic IDs, remove the topic from the in memory topic deletion state on the controller.
  4. Any unsuccessful StopReplicaRequest(s) will be retried after retryMs, starting from 1) and will be maintained in memory.

This leads to the question of what should be done if the controller never receives a successful response from a replica for a StopReplicaRequest. Under such a scenario it is still safe to stop retrying after a reasonable number of retries and time. Given that LeaderAndIsrRequest v4 includes v5 includes type flag, allowing for FULL requests to be identified, any stale partitions will be reconciled and deleted by a broker on startup upon receiving the initial LeaderAndIsrRequest from the a controller. This condition is also safe if the controller changes before the StopReplicaRequest(s) succeed, as the new controller will send a FULL LeaderAndIsrRequest on becoming the leader, ensuring that any stale partitions are cleaned up.

...

LeaderAndIsrRequest(s) will only be sent by the controller once a topic ID has been successfully assigned to the topic. The migration process can take place without an inter-broker protocol bump, as the format stored in /brokers/topics/[topic] will be compatible with older broker versions. The topic ids in requests and responses will be tagged fields to avoid changing the inter-broker protocol. 

When a replica receives a LeaderAndIsrRequest containing a topic ID for an existing partition which does not have a topic ID associated, it will create a partition metadata file for the topic partition locally. At this point the local partition will have been migrated to support topic IDs.

...