Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Brokers that don't have a broker ID lease are said to be "fenced."  When a broker is fenced, it cannot process any client requests.  This prevents brokers which are not receiving metadata updates or that are not receiving and processing them fast enough from causing issues to clients.

A fenced broker will continue to fetch on partitions it was already fetching before it was fenced, unless it hits a problem. If it does hit a problem, it won't be able to continue fetching, since it doesn't have the new metadata. For example, it won't know about leadership changes in any partitions.

Brokers start up in the fenced state, and can leave this state only by sending a heartbeat to the active controller and getting back a response that tells them they can become active.

...

Code Block
languagejs
{
  "apiKey": 57,
  "type": "request",
  "name": "BrokerRegistrationRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker epoch, or -1 if one has not yet been assigned." },
    { "name": "CurMetadataOffsetNextMetadataOffset", "type": "int64", "versions": "0+",
      "about": "The highest metadata offset which the broker has not reached." },
    { "name": "Listeners", "type": "[]Listener",
      "about": "The listeners of this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname." },
        { "name": "Port", "type": "int16", "versions": "0+",
          "about": "The port." },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol." }
      ]
    { "name": "Features", "type": "[]Feature",
      "about": "The features on this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The feature name." }
        { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The minimum supported feature level." },
        { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The maximum supported feature level." }
      ]
    },
    { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The rack which this broker is in." }
  ]
}

{
  "apiKey": 57,
  "type": "response",
  "name": "BrokerRegistrationResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "ActiveControllerId", "type": "int32", "versions": "0+",
      "about": "The ID of the active controller, or -1 if the controller doesn't know." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker's assigned epoch, or -1 if none was assigned." },
    { "name": "LeaseDurationMs", "type": "int64", "versions": "0+",
      "about": "If BrokerEpoch is not -1, the number of milliseconds that we want the lease to last." }
  ]
}

...

Heartbeat requests that give a CurMetadataOffset which a NextMetadataOffset which is lower than the previous one recorded for that broker will be ignored.

...

Code Block
{
  "apiKey": 8,
  "type": "metadata",
  "name": "DelegationTokenRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Owner", "type": "string", "versions": "0+",
      "about": "The delegation token owner." },
    { "name": "Renewers", "type": "[]string", "versions": "0+",
      "about": "The principals which have renewed this token." },
    { "name": "IssueTimestamp", "type": "int64", "versions": "0+",
      "about": "The time at which this timestamp was issued." },
    { "name": "MaxTimestamp", "type": "int64", "versions": "0+",
      "about": "The time at which this token cannot be renewed any more." },
    { "name": "ExpirationTimestamp", "type": "int64", "versions": "0+",
      "about": "The next time at which this token must be renewed." },
    { "name": "TokenId", "type": "string", "versions": "0+",
      "about": "The token id." },
  ]
} 

...

UserScramCredential

Code Block
{
  "apiKey": 9,
  "type": "metadata",
  "name": "DelegationTokenRecordUserScramCredential",
  "validVersions": "0",
  "fields": [
    { "name": "Name", "type": "string", "versions": "0+",
      "about": "The user name." },
    { "name": "CredentialInfos", "type": "[]CredentialInfo", "versions": "0+",
      "about": "The mechanism and related information associated with the user's SCRAM credential.", "fields": [
      { "name": "Mechanism", "type": "int8", "versions": "0+",
        "about": "The SCRAM mechanism." },
      { "name": "Salt", "type": "bytes", "versions": "0+",
        "about": "A random salt generated by the client." },
      { "name": "SaltedPassword", "type": "bytes", "versions": "0+",
        "about": "The salted password." },
      { "name": "Iterations", "type": "int32", "versions": "0+",
        "about": "The number of iterations used in the SCRAM credential." }]}
  ]
} 

...