Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The broker heartbeat mechanism replaces the controlled shutdown RPC.  Therefore, we will not need to support the this RPC any more in the controller-- except for compatibility during upgrades, which will be described further in a follow-on KIP.

Topic Identifiers

This KIP builds on top of the work done in KIP-516 to improve how partitions are tracked.

Because each topic is identified by a unique topic UUID, we can implement topic deletion with a single record, RemoveTopicRecord.  Upon replaying this record, each broker will delete the associated topic if it is present.

Of course, some brokers may be down when the topic is deleted.  In fact, some brokers may never see the RemoveTopicRecord.  This record may get collapsed into one of the periodic metadata snapshots.  If this happens, the record will be reflected in the snapshot through the absence of a broker record, not its presence.  Therefore, during the startup process, brokers must compare the log directories that they have with the ones contained in the latest metadata.  The appropriate time to do this is at the start of the RECOVERY phase.  At this point, the broker has the latest metadata.

BrokerRegistration

Code Block
languagejs
{
  "apiKey": 57,
  "type": "request",
  "name": "BrokerRegistrationRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID." },
	{ "name": "ClusterId", "type": "uuid", "versions": "0+",
	  "about": "The cluster id of the broker process." },
	{ "name": "IncarnationId", "type": "uuid", "versions": "0+",
	  "about": "The incarnation id of the broker process." },
    { "name": "CurMetadataOffset", "type": "int64", "versions": "0+",
      "about": "The highest metadata offset which the broker has reached." },
    { "name": "Listeners", "type": "[]Listener",
      "about": "The listeners of this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname." },
        { "name": "Port", "type": "int16", "versions": "0+",
          "about": "The port." },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol." }
      ]
    { "name": "Features", "type": "[]Feature",
      "about": "The features on this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The feature name." }
        { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The minimum supported feature level." },
        { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The maximum supported feature level." }
      ]
    },
    { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The rack which this broker is in." }
  ]
}

{
  "apiKey": 57,
  "type": "response",
  "name": "BrokerRegistrationResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker's assigned epoch, or -1 if none was assigned." }
  ]
}

...

Code Block
{
  "apiKey": 2,
  "type": "metadata",
  "name": "TopicRecord",
  "validVersions": "0",
  "fields": [
        { "name": "Name", "type": "string", "versions": "0+",
          "about": "The topic name." },
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The unique ID of this topic." },
        { "name": "Deleting", "type": "boolean", "versions": "0+",
          "about": "True if this topic is in the process of being deleted." }
  ]
}

PartitionRecord

Code Block
{
  "apiKey": 3,
  "type": "metadata",
  "name": "PartitionRecord",
  "validVersions": "0",
  "fields": [
    { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The partition id." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique ID of this topic." },
    { "name": "Replicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas of this partition, sorted by preferred order." },
    { "name": "Isr", "type":  "[]int32", "versions":  "0+",
      "about": "The in-sync replicas of this partition" },
    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas that we are in the process of removing." },
    { "name": "AddingReplicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas that we are in the process of adding." },
    { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The lead replica, or -1 if there is no leader." },
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
      "about": "An epoch that gets incremented each time we change the leader." }
  ]
}

...