Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In the post-KIP-500 world, there will be three broker registration states: unregistered, registered but fenced, and registered and active.  Just like today, unregistered means that there is no registration information and no way to reach the broker.  It is effectively not part of the cluster.  In the two registered states, in contrast, contact information is available.  However, in the "registered but fenced" state, the contact information might no longer be valid.  For example, if a broker crashes and is not restarted, it will end up in "registered but fenced" state.

Broker Registration

...

Every distributed system needs a way of managing cluster membership.  Prior to KIP-500, Kafka brokers registered ephemeral znodes in order to register themselves as part of the cluster.  The Kafka controller passively consumed the registration information from Zookeeper.

In the post-KIP-500 world there is no ZooKeeper and no ephemeral znodes.  Instead, each broker registers itself with the active controller using a BrokerRegistrationRequest. The active controller assigns the broker a new broker epoch, based on the latest committed offset in the log.  The new epoch is guaranteed to be higher than any previoius epochs that have been used for the given broker idThe new broker epoch is associated with the new registra

Broker Leases

  Subsequently, the broker sends a heartbeat request to the active controller every few seconds to keep the registration active.

Broker registrations are time-bounded.  They While the broker is registered, it may be either fenced or active.  If the broker is active, that means it has a lease.  Leases are time-bounded; they only last for a certain amount of time.  This period is specified by the controller in its response.  Once the period has elapsed, if the broker has not renewed its registration lease via a heartbeat, it must re-registerwill be fenced.

Unlike ZooKeeper, the active controller may choose not to accept a broker's registration.  For example, it will do this if the ID that the broker is trying to claim has already been claimed.  In that case, the broker should shut down rather than trying to claim a broker ID which is already in use.  Another reason that the controller can reject a registration is is if the broker doesn't support all of the KIP-584 feature levels that are currently enabled in the cluster.

...

The broker heartbeat mechanism replaces the controlled shutdown RPC.  Therefore, we will not need to support the this RPC any more in the controller-- except for compatibility during upgrades, which will be described further in a follow-on KIP.

BrokerRegistration

Code Block
languagejs
{
  "apiKey": 57,
  "type": "request",
  "name": "BrokerRegistrationRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID." },
    	{ "name": "BrokerEpochIncarnationId", "type": "int64uuid", "versions": "0+",
	  "defaultabout": "-1",
The incarnation id of the broker process."about": "The broker epoch, or -1 if one has not yet been assigned." },
    { "name": "NextMetadataOffset", "type": "int64", "versions": "0+",
      "about": "The highest metadata offset which the broker has not reached." },
    { "name": "Listeners", "type": "[]Listener",
      "about": "The listeners of this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname." },
        { "name": "Port", "type": "int16", "versions": "0+",
          "about": "The port." },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol." }
      ]
    { "name": "Features", "type": "[]Feature",
      "about": "The features on this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The feature name." }
        { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The minimum supported feature level." },
        { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The maximum supported feature level." }
      ]
    },
    { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The rack which this broker is in." }
  ]
}

{
  "apiKey": 57,
  "type": "response",
  "name": "BrokerRegistrationResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "ActiveControllerId", "type": "int32", "versions": "0+",
      "about": "The ID of the active controller, or -1 if the controller doesn't know." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker's assigned epoch, or -1 if none was assigned." },
    { "name": "LeaseDurationMs", "type": "int64", "versions": "0+",
      "about": "If BrokerEpoch is not -1, the number of milliseconds that we want the lease to last." }
  ]
}

...

Heartbeat requests that give a NextMetadataOffset which is lower than the previous one recorded for that broker will be ignored.

Record Formats

...

RegisterBrokerRecord

Code Block
{
  "apiKey": 0,
  "type": "metadata",
  "name": "BrokerRecordRegisterBrokerRecord",
  "validVersions": "0",
  "fields": [
	{ "name": "Id", "type": "int32", "versions": "0+",
	  "about": "The broker id." },
	{ "name": "IncarnationId", "type": "uuid", "versions": "0+",
	  "about": "The local incarnation id of the broker process." },
	{ "name": "EpochBrokerEpoch", "type": "int64", "versions": "0+",
	  "about": "The broker epoch assigned by the controller." },
	{ "name": "EndPoints", "type": "[]BrokerEndpoint", "versions": "0+", "nullableVersions": "0+",
	  "about": "The endpoints that can be used to communicate with this broker.", "fields": [
		{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
		  "about": "The name of the endpoint." },
		{ "name": "Host", "type": "string", "versions": "0+",
		  "about": "The hostname." },
		{ "name": "Port", "type": "int16", "versions": "0+",
		  "about": "The port." },
		{ "name": "SecurityProtocol", "type": "int16", "versions": "0+",
		  "about": "The security protocol." }
	]},
	{ "name": "Features", "type": "[]BrokerFeature", "versions": "0+", "nullableVersions": "0+",
	  "about": "The features that this broker supports.", "fields": [
		{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
		  "about": "The name of the feature." },
		{ "name": "MinVersion",": "MinVersion", "type": "int16", "versions": "0+",
		  "about": "The minimum feature level that this broker supports." },
		{ "name": "MaxVersion", "type": "int16", "versions": "0+",
		  "about": "The maximum feature level that this broker supports." }
	]},
	{ "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
	  "about": "The broker rack." }
  ]
}

UnregisterBrokerRecord

Code Block
{
  "apiKey": 1,
  "type": "int16metadata",
  "versionsname": "0+UnregisterBrokerRecord",
		  "aboutvalidVersions": "0"The,
 minimum feature level that this broker supports." },
		 "fields": [
	{ "name": "MaxVersionId", "type": "int16int32", "versions": "0+",
		  "about": "The maximum feature level that this broker supportsid." }
	]},
	{ "name": "RackEpoch", "type": "stringint64", "versions": "0+", "nullableVersions": "0+",
	  "about": "The broker rackepoch." }
  ]
}

TopicRecord

Code Block
{
  "apiKey": 12,
  "type": "metadata",
  "name": "TopicRecord",
  "validVersions": "0",
  "fields": [
        { "name": "Name", "type": "string", "versions": "0+",
          "about": "The topic name." },
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The unique ID of this topic." },
        { "name": "Deleting", "type": "boolean", "versions": "0+",
          "about": "True if this topic is in the process of being deleted." }
  ]
}

PartitionRecord

Code Block
{
  "apiKey": 23,
  "type": "metadata",
  "name": "PartitionRecord",
  "validVersions": "0",
  "fields": [
    { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The partition id." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique ID of this topic." },
    { "name": "Replicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas of this partition, sorted by preferred order." },
    { "name": "Isr", "type":  "[]int32", "versions":  "0+",
      "about": "The in-sync replicas of this partition" },
    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas that we are in the process of removing." },
    { "name": "AddingReplicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas that we are in the process of adding." },
    { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The lead replica, or -1 if there is no leader." },
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
      "about": "An epoch that gets incremented each time we change the leader." }
  ]
}

ConfigRecord

Code Block
{         
  "apiKey": 34,
  "type": "metadata",
  "name": "ConfigRecord",
  "validVersions": "0",
  "fields": [
    { "name": "ResourceType", "type": "int8", "versions": "0+",
      "about": "The type of resource this configuration applies to." },
    { "name": "ResourceName", "type": "string", "versions": "0+",
      "about": "The name of the resource this configuration applies to." },         
    { "name": "Name", "type": "string", "versions": "0+",
      "about": "The name of the configuration key." },                  
    { "name": "Value", "type": "string", "versions": "0+",     
      "about": "The value of the configuration." }
  ]           
} 

...

IsrChangeRecord

Code Block
{
  "apiKey": 45,
  "type": "metadata",
  "name": "IsrChangeRecord",
  "validVersions": "0",
  "fields": [
    { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The partition id." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique ID of this topic." },
    { "name": "Isr", "type":  "[]int32", "versions":  "0+",
      "about": "The in-sync replicas of this partition" },
    { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The lead replica, or -1 if there is no leader." },
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
      "about": "An epoch that gets incremented each time we change the leader." }
  ]
} 

AccessControlRecord

Code Block
{
  "apiKey": 56,
  "type": "metadata",
  "name": "AccessControlRecord",
  "validVersions": "0",
  "fields": [
    { "name": "ResourceType", "type": "int8", "versions": "0+",
      "about": "The resource type" },
    { "name": "ResourceName", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The resource name, or null if this is for the default resource." },
    { "name": "PatternType", "type": "int8", "versions": "0+",
      "about": "The pattern type (literal, prefixed, etc.)" },
    { "name": "Principal", "type": "string", "versions": "0+",
      "about": "The principal name." },
    { "name": "Host", "type": "string", "versions": "0+",
      "about": "The host." },
    { "name": "Operation", "type": "int8", "versions": "0+",
      "about": "The operation type." },
    { "name": "PermissionType", "type": "int8", "versions": "0+",
      "about": "The permission type (allow, deny)." }
  ]
} 

...

FenceBrokerRecord

Code Block
{
  "apiKey": 67,
  "type": "metadata",
  "name": "FenceBrokerRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Id", "type": "int32", "versions": "0+",
      "about": "The broker ID to fence. It will be removed from all ISRs." }
    { "name": "Epoch", "type": "int64", "versions": "0+",
      "about": "The epoch of the broker to fence." }
  ]
} 

...

UnfenceBrokerRecord

Code Block
{
  "apiKey": 118,
  "type": "metadata",
  "name": "UnfenceBrokerRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Id", "type": "int32", "versions": "0+",
      "about": "The broker ID to unfence." }
    { "name": "Epoch", "type": "int64", "versions": "0+",
      "about": "The epoch of the broker to unfence." }
  ]
} 

RemoveTopic

Code Block
{
  "apiKey": 79,
  "type": "metadata",
  "name": "RemoveTopicRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Id", "type": "uuid", "versions": "0+",
      "about": "The topic to remove. All associated partitions will be removed as well." }
  ]
} 

...

DelegationTokenRecord

Code Block
{
  "apiKey": 810,
  "type": "metadata",
  "name": "DelegationTokenRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Owner", "type": "string", "versions": "0+",
      "about": "The delegation token owner." },
    { "name": "Renewers", "type": "[]string", "versions": "0+",
      "about": "The principals which have renewed this token." },
    { "name": "IssueTimestamp", "type": "int64", "versions": "0+",
      "about": "The time at which this timestamp was issued." },
    { "name": "MaxTimestamp", "type": "int64", "versions": "0+",
      "about": "The time at which this token cannot be renewed any more." },
    { "name": "ExpirationTimestamp", "type": "int64", "versions": "0+",
      "about": "The next time at which this token must be renewed." },
    { "name": "TokenId", "type": "string", "versions": "0+",
      "about": "The token id." },
  ]
} 

...

UserScramCredentialRecord

Code Block
{
  "apiKey": 911,
  "type": "metadata",
  "name": "UserScramCredentialUserScramCredentialRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Name", "type": "string", "versions": "0+",
      "about": "The user name." },
    { "name": "CredentialInfos", "type": "[]CredentialInfo", "versions": "0+",
      "about": "The mechanism and related information associated with the user's SCRAM credential.", "fields": [
      { "name": "Mechanism", "type": "int8", "versions": "0+",
        "about": "The SCRAM mechanism." },
      { "name": "Salt", "type": "bytes", "versions": "0+",
        "about": "A random salt generated by the client." },
      { "name": "SaltedPassword", "type": "bytes", "versions": "0+",
        "about": "The salted password." },
      { "name": "Iterations", "type": "int32", "versions": "0+",
        "about": "The number of iterations used in the SCRAM credential." }]}
  ]
} 

...

FeatureLevelRecord

Code Block
{
  "apiKey": 1012,
  "type": "metadata",
  "name": "FeatureLevelRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
      "about": "The feature name." },
    { "name": "MinFeatureLevel", "type": "int16", "versions": "0+",
      "about": "The current finalized minimum feature level of this feature for the cluster." },
    { "name": "MaxFeatureLevel", "type": "int16", "versions": "0+",
      "about": "The current finalized maximum feature level of this feature for the cluster." }
  ]
} 

...