Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Broker Registration and State Management

Broker Startup

When starting up, the broker will send a fetch request to the controller, getting the state of the metadata log.  If the broker's local copy of the metadata log is too far behind, the broker will fetch a complete snapshot from the controller, as described in KIP-630.  Otherwise, the broker will fetch just what it needs.  Once it has caught up to the high water mark of the controller (or at least, what the controller's high water mark used to be), it will be able to remove the log directories which it no longer needs.

Broker Registration and Heartbeats

Registration State

Currently, from the perspective of ZooKeeper, there are two states brokers can be in: registered, and not registered.  When brokers are registered, other brokers can find their network endpoints in order to communicate with them.  They are also part of the MetadataResponse communicated back to clients.  When they are not registered, neither of those are trueEvery distributed system needs a way of managing cluster membership.  Prior to KIP-500, Kafka brokers registered ephemeral znodes in order to register themselves as part of the cluster.  The Kafka controller passively consumed the registration information from Zookeeper.

In the post-KIP-500 world there is no ZooKeeper and no ephemeral znodes.  Instead, each broker registers itself with the active controller using a BrokerRegistrationRequest. The active controller assigns the broker a new broker epoch, based on the latest committed offset in the log.  Subsequently, the broker sends a heartbeat request to the active controller every few seconds to keep the registration active.

Broker registrations are time-bounded.  They only last for a certain amount of time.  This period is specified by the controller in its response.  Once the period has elapsed, if the broker has not renewed its registration via a heartbeat, it must re-register.

Unlike ZooKeeper, the active controller may choose not to accept a broker's registration.  For example, it will do this if the ID that the broker is trying to claim has already been claimed.  In that case, the broker should shut down rather than trying to claim a broker ID which is already in use.  Another reason that the controller can reject a registration is is if the broker doesn't support all of the KIP-584 feature levels that are currently enabled in the cluster.

Finally, the broker registration and heartbeat mechanism gives the broker a chance to specify its target state, and the controller to tell the broker what its actual state should be.

Fencing

Brokers that don't have a broker ID lease are said to be "fenced."  When a broker is fenced, it cannot process any client requests.  This prevents brokers which are not receiving metadata updates or that are not receiving and processing them fast enough from causing issues to clients.

A fenced broker will continue to fetch on partitions it was already fetching before it was fenced, unless it hits a problem. If it does hit a problem, it won't be able to continue fetching, since it doesn't have the new metadata. For example, it won't know about leadership changes in any partitions.

Brokers start up in the fenced state, and can leave this state only by sending a heartbeat to the active controller and getting back a response that tells them they can become active.

Brokers will re-enter the fenced state if they are unable to communicate with the active controller within registration.lease.timeout.ms.

Controlled Shutdown

In the pre-KIP-500 world, brokers triggered a controller shutdown by making an RPC to the controller.  When the controller returned a successful result from this RPC, the broker knew that it could shut down.

In the post-KIP-500 world, controller shutdown is handled by the broker heartbeat system instead.  In its periodic heartbeats, the broker asks the controller if it can transition into the SHUTDOWN state.  This motivates the controller to move all of the leaders off of that broker.  Once they are all moved, the controller responds to the heartbeat with a nextState of SHUTDOWN.

Broker ID Conflicts

, there will be three broker registration states: unregistered, registered but fenced, and registered and active.  Just like today, unregistered means that there is no registration information and no way to reach the broker.  It is effectively not part of the cluster.  In the two registered states, in contrast, contact information is available.  However, in the "registered but fenced" state, the contact information might no longer be valid.  For example, if a broker crashes and is not restarted, it will end up in "registered but fenced" state.

Broker Registration and Heartbeats

Every distributed system needs a way of managing cluster membership.  Prior to KIP-500, Kafka brokers registered ephemeral znodes in order to register themselves as part of the cluster.  The Kafka controller passively consumed the registration information from Zookeeper.

In the post-KIP-500 world there is no ZooKeeper and no ephemeral znodes.  Instead, each broker registers itself with the active controller using a BrokerRegistrationRequest. The active controller assigns the broker a new broker epoch, based on the latest committed offset in the log.  Subsequently, the broker sends a heartbeat request to the active controller every few seconds to keep the registration active.

Broker registrations are time-bounded.  They only last for a certain amount of time.  This period is specified by the controller in its response.  Once the period has elapsed, if the broker has not renewed its registration via a heartbeat, it must re-register.

Unlike ZooKeeper, the active controller may choose not to accept a broker's registration.  For example, it will do this if the ID that the broker is trying to claim has already been claimed.  In that case, the broker should shut down rather than trying to claim a broker ID which is already in use.  Another reason that the controller can reject a registration is is if the broker doesn't support all of the KIP-584 feature levels that are currently enabled in the cluster.

Finally, the broker registration and heartbeat mechanism gives the broker a chance to specify its target state, and the controller to tell the broker what its actual state should be.

Fencing

Brokers that don't have a broker ID lease are said to be "fenced."  When a broker is fenced, it cannot process any client requests.  This prevents brokers which are not receiving metadata updates or that are not receiving and processing them fast enough from causing issues to clients.

A fenced broker will continue to fetch on partitions it was already fetching before it was fenced, unless it hits a problem. If it does hit a problem, it won't be able to continue fetching, since it doesn't have the new metadata. For example, it won't know about leadership changes in any partitions.

Brokers start up in the fenced state, and can leave this state only by sending a heartbeat to the active controller and getting back a response that tells them they can become active.

Brokers will re-enter the fenced state if they are unable to communicate with the active controller within registration.lease.timeout.ms.

Controlled Shutdown

In the pre-KIP-500 world, brokers triggered a controller shutdown by making an RPC to the controller.  When the controller returned a successful result from this RPC, the broker knew that it could shut down.

In the post-KIP-500 world, controller shutdown is handled by the broker heartbeat system instead.  In its periodic heartbeats, the broker asks the controller if it can transition into the SHUTDOWN state.  This motivates the controller to move all of the leaders off of that broker.  Once they are all moved, the controller responds to the heartbeat with a nextState of SHUTDOWN.

Broker ID Conflicts

Clearly, in a correctly managed cluster, there should be no broker ID conflicts.  Each broker should be configured with a unique ID.  However, we want the system to be robust against misconfigurations.  Therefore, if there are two brokers Clearly, in a correctly managed cluster, there should be no broker ID conflicts.  Each broker should be configured with a unique ID.  However, we want the system to be robust against misconfigurations.  Therefore, if there are two brokers that claim the same ID, the controller will choose only one and tell the other to fence itself.

...

While in this state, the broker does not respond to client requests.

Its goal is to catch up with the metadata log managed by the controller quorum.

If the broker's local copy of the metadata log is too far behind, the broker will fetch a complete snapshot from the controller, as described in KIP-630.  Otherwise, the broker will fetch just what it needs.  Once it has caught up far enough, the active controller will tell it to transition to the RUNNING state.  This communication happens via broker heartbeats.It will try to re-register itself with the active  controller.  If it succeeds, it will go into the RUNNING state.

On a control-C in this state, the broker process shuts down.

...

Record Formats

BrokerRecord

Code Block
{
  "apiKey": 0,
  "type": "metadata",
  "name": "BrokerRecord",
  "validVersions": "0",
  "fields": [
	{ "name": "Id", "type": "int32", "versions": "0+",
	  "about": "The broker id." },
	{ "name": "Epoch", "type": "int64", "versions": "0+",
	  "about": "The broker epoch." },
	{ "name": "EndPoints", "type": "[]BrokerEndpoint", "versions": "0+", "nullableVersions": "0+",
	  "about": "The endpoints that can be used to communicate with this broker.", "fields": [
		{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
		  "about": "The name of the endpoint." },
	{
  "apiKey": 0,
  "type": "metadata",
  "name": "BrokerRecord",
  "validVersions": "0",
  "fields": [
	{ "name": "IdHost", "type": "int32string", "versions": "0+",
		  "about": "The broker idhostname." },
		{ "name": "EpochPort", "type": "int64int16", "versions": "0+",
		  "about": "The broker epochport." },
		{ "name": "EndPointsSecurityProtocol", "type": "[]BrokerEndpointint16", "versions": "0+", "nullableVersions": "0+",
		  "about": "The endpoints that can be used to communicate with this broker.", "fields": [
	security protocol." }
	]},
	{ "name": "NameFeatures", "type": "string[]BrokerFeature", "versions": "0+", "mapKeynullableVersions": true"0+",
		  "about": "The namefeatures that ofthis thebroker endpointsupports.", "fields": },[
		{ "name": "HostName", "type": "string", "versions": "0+", "mapKey": true,
		  "about": "The hostnamename of the feature." },
		{ "name": "PortMinVersion", "type": "int16", "versions": "0+",
		  "about": "The port minimum feature level that this broker supports." },
		{ "name": "SecurityProtocolMaxVersion", "type": "int16", "versions": "0+",
		  "about": "The security protocolmaximum feature level that this broker supports." }
	]},
	{ "name": "FeaturesRack", "type": "[]BrokerFeaturestring", "versions": "0+", "nullableVersions": "0+",
	  "about": "The features that this broker supportsrack.", "fields": [
		{ "name": "Name", }
  ]
}

TopicRecord

Code Block
{
  "apiKey": 1,
  "type": "stringmetadata",
  "versionsname": "0+TopicRecord",
  "mapKeyvalidVersions": true"0",
		  "aboutfields": [
   "The name of the feature." },
		{ "name": "MinVersionName", "type": "int16string", "versions": "0+",
		          "about": "The minimum feature level that this broker supports." },
		 topic name." },
        { "name": "MaxVersionTopicId", "type": "int16uuid", "versions": "0+",
		          "about": "The maximumunique featureID level thatof this broker supportstopic." }
	]},
	        { "name": "RackDeleting", "type": "stringboolean", "versions": "0+", "nullableVersions": "0+",
	
          "about": "The broker rackTrue if this topic is in the process of being deleted." }
  ]
}

...

PartitionRecord

Code Block
{
  "apiKey": 12,
  "type": "metadata",
  "name": "TopicRecordPartitionRecord",
  "validVersions": "0",
  "fields": [
        { "name": "NamePartitionId", "type": "stringint32", "versions": "0+",
          "about"default": "The topic name." },
        { "name": "TopicId", "type": "uuid", "versions": "0+",
  -1",
        "about": "The unique ID of this topicpartition id." },
        { "name": "DeletingTopicId", "type": "booleanuuid", "versions": "0+",
          "about": "TrueThe if this topic is in the processunique ID of beingthis deletedtopic." },
    ]
}

PartitionRecord

Code Block
{
  "apiKeyname": 2"Replicas",
  "type":  "metadata[]int32",
  "nameversions":  "PartitionRecord0+",
      "validVersionsabout": "0",
  "fields": [The replicas of this partition, sorted by preferred order." },
    { "name": "PartitionIdIsr", "type":  "[]int32", "versions":  "0+", "default": "-1",
      "about": "The in-sync replicas partition id.of this partition" },
    { "name": "TopicIdRemovingReplicas", "type":  "uuid[]int32", "versions":  "0+",
      "about": "The unique ID replicas that we are in the process of this topicremoving." },
    { "name": "ReplicasAddingReplicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas ofthat we thisare partition,in sortedthe byprocess preferredof orderadding." },
    { "name": "IsrLeader", "type":  "[]int32", "versions":  "0+", "default": "-1",
      "about": "The lead replica, or in-sync1 if replicasthere ofis thisno partitionleader." },
    { "name": "RemovingReplicasLeaderEpoch", "type":  "[]int32", "versions":  "0+", "default": "-1",
      "about": "TheAn replicasepoch that gets weincremented areeach intime thewe processchange ofthe removingleader." },

  ]
}

ConfigRecord

Code Block
{         
 { "nameapiKey": "AddingReplicas"3,
  "type":  "[]int32metadata",
  "versionsname":  "0+ConfigRecord",
      "aboutvalidVersions": "The0",
 replicas that we are in the process of adding." }, "fields": [
    { "name": "LeaderResourceType", "type": "int32int8", "versions": "0+", "default": "-1",
      "about": "The lead replica, or -1 if there is no leadertype of resource this configuration applies to." },
    { "name": "LeaderEpochResourceName", "type": "int32string", "versions": "0+", "default": "-1",
      "about": "AnThe epochname thatof getsthe incrementedresource eachthis timeconfiguration we change the leaderapplies to." }
,   ]
}

ConfigRecord

Code Block
{      
   
 { "apiKeyname": 3"Name",
  "type": "metadatastring",
  "nameversions": "ConfigRecord0+",
      "validVersionsabout": "0",
  "fields": [The name of the configuration key." },                  
    { "name": "ResourceTypeValue", "type": "int8string", "versions": "0+",     
      "about": "The typevalue of resource thisthe configuration applies to." },
  ]           
} 

IsrChange

Code Block
{
  "nameapiKey": "ResourceName"4,
  "type": "stringmetadata",
  "versionsname": "0+IsrChangeRecord",
      "aboutvalidVersions": "The name of the resource this configuration applies to." },         0",
  "fields": [
    { "name": "PartitionId", "type": "Nameint32", "typeversions": "string0+", "versionsdefault": "0+-1",
      "about": "The name of the configuration keypartition id." },                  
    { "name": "ValueTopicId", "type": "stringuuid", "versions": "0+",     
      "about": "The unique valueID of thethis configurationtopic." },
  ]           
} 

IsrChange

Code Block
{
  "apiKeyname": 4"Isr",
  "type":  "metadata[]int32",
  "nameversions":  "IsrChangeRecord0+",
      "validVersionsabout": "0",
  "fields": [The in-sync replicas of this partition" },
    { "name": "PartitionIdLeader", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The partition idlead replica, or -1 if there is no leader." },
    { "name": "TopicIdLeaderEpoch", "type": "uuidint32", "versions": "0+", "default": "-1",
      "about": "The unique ID of this topicAn epoch that gets incremented each time we change the leader." },
  ]
} 

AccessControlRecord

Code Block
{
 { "nameapiKey": "Isr"5,
  "type":  "[]int32metadata",
  "versionsname":  "0+AccessControlRecord",
      "aboutvalidVersions": "The in-sync replicas of this partition" },"0",
  "fields": [
    { "name": "LeaderResourceType", "type": "int32int8", "versions": "0+", "default": "-1",
      "about": "The lead replica, or -1 if there is no leader.resource type" },
    { "name": "LeaderEpochResourceName", "type": "int32string", "versions": "0+", "defaultnullableVersions": "-10+",
      "about": "AnThe epochresource thatname, getsor incrementednull eachif timethis weis changefor the default leaderresource." },
  ]
} 

AccessControlRecord

Code Block
{
   { "apiKeyname": 5"PatternType",
  "type": "metadataint8",
  "nameversions": "AccessControlRecord0+",
      "validVersionsabout": "0",
  "fields": [The pattern type (literal, prefixed, etc.)" },
    { "name": "ResourceTypePrincipal", "type": "int8string", "versions": "0+",
      "about": "The resourceprincipal typename." },
    { "name": "ResourceNameHost", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The resource name, or null if this is for the default resourcehost." },
    { "name": "PatternTypeOperation", "type": "int8", "versions": "0+",
      "about": "The patternoperation type (literal, prefixed, etc.)" },
    { "name": "PrincipalPermissionType", "type": "stringint8", "versions": "0+",
      "about": "The principal name." } permission type (allow, deny)." }
  ]
} 

FenceBroker

Code Block
{
  "apiKey": 6,
    { "nametype": "Hostmetadata",
  "typename": "stringFenceBrokerRecord",
  "versionsvalidVersions": "0+",
      "aboutfields": "The host." },[
    { "name": "OperationId", "type": "int8int32", "versions": "0+",
      "about": "The operation type broker ID to fence. It will be removed from all ISRs." },
    { "name": "PermissionTypeEpoch", "type": "int8int64", "versions": "0+",
      "about": "The epoch permissionof the typebroker (allow, deny)to fence." }
  ]
} 

...

UnfenceBroker

Code Block
{
  "apiKey": 611,
  "type": "metadata",
  "name": "FenceBrokerRecordUnfenceBrokerRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Id", "type": "int32", "versions": "0+",
      "about": "The broker ID to fence. It will be removed from all ISRs.unfence." }
    { "name": "Epoch", "type": "int64", "versions": "0+",
      "about": "The epoch of the broker to fenceunfence." }
  ]
} 

RemoveTopic

Code Block
{
  "apiKey": 7,
  "type": "metadata",
  "name": "RemoveTopicRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Id", "type": "uuid", "versions": "0+",
      "about": "The topic to remove. All associated partitions will be removed as well." }
  ]
} 

...

Since KIP-500 mode is currently in a pre-alpha state, we do not guarantee that future versions will support upgrading from the current version of it yet.  Once it is more stable, we will have a more traditional binary compatibility regime.

Rejected Alternatives

...

Suport Automatic Broker ID Assignment

This KIP proposes to drop support for automatic broker ID assignment.  What if we decided to continue to support it?

...