Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This state machine ties together a few of the previous sections, such as broker registration and shutdown.

The broker heartbeat always contains the broker's epoch, unless the broker is in the INITIAL state.  When the broker is in the INITIAL state, it is waiting for the controller to assign it a new broker epoch.

Code Block
                        INITIAL
sent initial heartbeart,  | 
got back an epoch         |
                          V
                        ACTIVE -------------------------> SHUTDOWN
lease expired or revoked  |  ^                              ^
(stale metadata,          |  | lease restored               | controller granted
 id conflict,             |  | with new broker epoch        | controlled shutdown
 unable to communicate,   V  |                              |
 etc.)                  FENCED -----------------------------+

...

Configuration NamePossible ValuesDescription
process.roles

null

broker

controller

broker,controller

If this is null (absent) then we are in legacy mode.

Otherwise, we are in KIP-500 mode and this configuration determines what roles this process should play: broker, controller, or both.

controller.listener.names

If non-null, this must be a comma-separated list of listener names.

When communicating with the controller quorum, the broker will always use the first listener in this list.

A comma-separated list of the names of the listeners used by the KIP-500 controller. This is required if this process is a KIP-500 controller. The legacy controller will not use this configuration

Despite the similar name, note that this is different from the "control plane listener" introduced by KIP-291.  The "control plane listener" is used on brokers, not on controllers.

controller.connect

If non-null, this must be a comma-separated list of all the controller voters, in the format:

{controller-id}@{controller-host):{controller-port}

When in KIP-500 mode, each node must have this configuration, in order to find out how to communicate with the controller quorum.

Note that this replaces the "quorum.voters" config described in KIP-595.

controller.id

a 32-bit ID

The controller id for this server.  If Only required if this is not specified, we will default to the broker.id plus 3000server is a controller.

broker.ida 32-bit IDThe broker id for this server.  Only required if this server is a broker.
registration.heartbeat.interval.ms2000The length of time between broker heartbeats.
registration.lease.timeout.ms18000The length of time that a broker lease lasts if no heartbeats are made.
metadata.log.dirIf set, this must be a path to a log directory.This configuration determines where we put the metadata log.  if it is not set, the metadata log is placed in the first log directory from log.dirs.

RPCs

Obsoleting the Metadata Propagation RPCs

As discussed earlier, the new controller will use FetchRequest to fetch metadata from the active controller.  The details of how Raft fetching will work are spelled out in KIP-595: A Raft Protocol for the Metadata Quorum.

Since we propagate the metadata via Raft, we will no longer need to send out LeaderAndIsrRequest, UpdateMetadataRequest, and StopReplicaRequest.  These requests will be sent out only when we're in legacy mode, not when we're in KIP-500 mode.  Eventually we will add some support for these requests to the new controller, in order to support rolling upgrade from a pre-KIP-500 release. However, for the purpose of this KIP, the new controller will not use these requests.

Obsoleting the Controlled Shutdown RPC

The broker heartbeat mechanism replaces the controlled shutdown RPC.  Therefore, we will not need to support the this RPC any more in the controller-- except for compatibility during upgrades, which will be described further in a follow-on KIP.

BrokerHeartbeat

As described earlier, the broker periodically sends out a heartbeat request to the active controller.

controller.quorum.fetch.timeout.msMaximum time without a successful fetch from the current leader before a new election is started.New name for quorum.fetch.timeout.ms

controller.quorum.election.timeout.ms

Maximum time without collected a majority of votes during the candidate state before a new election is retriedNew name for quorum.election.timeout.ms
controller.quorum.election.backoff.max.msMaximum exponential backoff time (based on the number if retries) after an election timeout, before a new election is triggered.New name for quorum.election.backoff.max.ms
controller.quorum.request.timeout.msMaximum time before a pending request is considered failed and the connection is droppedNew name for quorum.request.timeout.ms
controller.quorum.retry.backoff.msInitial delay between request retries. This config and the one below is used for retriable request errors or lost connectivity and are different from the election.backoff configs aboveNew name for quorum.retry.backoff.ms
controller.quorum.retry.backoff.max.msMax delay between requests. Backoff will increase exponentially beginning from quorum.retry.backoff.msNew name for quorum.retry.backoff.max.ms

RPCs

Obsoleting the Metadata Propagation RPCs

As discussed earlier, the new controller will use FetchRequest to fetch metadata from the active controller.  The details of how Raft fetching will work are spelled out in KIP-595: A Raft Protocol for the Metadata Quorum.

Since we propagate the metadata via Raft, we will no longer need to send out LeaderAndIsrRequest, UpdateMetadataRequest, and StopReplicaRequest.  These requests will be sent out only when we're in legacy mode, not when we're in KIP-500 mode.  Eventually we will add some support for these requests to the new controller, in order to support rolling upgrade from a pre-KIP-500 release. However, for the purpose of this KIP, the new controller will not use these requests.

Obsoleting the Controlled Shutdown RPC

The broker heartbeat mechanism replaces the controlled shutdown RPC.  Therefore, we will not need to support the this RPC any more in the controller-- except for compatibility during upgrades, which will be described further in a follow-on KIP.

BrokerHeartbeat

As described earlier, the broker periodically sends out a heartbeat request to the active controller.

Code Block
languagejs
{
  "apiKey": 50,
  "type": "request",
  "name": "BrokerHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "CurrentState", "type": "int8", "versions": "0+",
      "about": "The current state that the broker is in." },
    { "name": "TargetState", "type": "int8", "versions": "0+",
      "about": "The state that the broker wants to reach." },
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker epoch, or -1 if one has not yet been assigned." },
    { "name": "CurMetadataOffset", "type": "int64", "versions": "0+",
      "about": "The highest metadata offset which the broker has reached." },
Code Block
languagejs
{
  "apiKey": 50,
  "type": "request",
  "name": "BrokerHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "CurrentStateListeners", "type": "int8[]Listener", "versionsnullableVersions": "0+",
      "about": "The currentlisteners stateof thatthis the broker is in." },
broker", "versions": "0+", "fields": [
        { "name": "TargetStateName", "type": "int8string", "versions": "0+", "mapKey": true,
          "about": "The statename thatof the broker wants to reachendpoint." },
        { "name": "BrokerIdHost", "type": "int32string", "versions": "0+",
          "about": "The broker IDhostname." },
        { "name": "BrokerEpochPort", "type": "int64int16", "versions": "0+", "default": "-1",

          "about": "The brokerport." epoch},
 or -1 if one has not yet been assigned." },
    { "name": "CurMetadataOffsetSecurityProtocol", "type": "int64int16", "versions": "0+",
          "about": "The highestsecurity metadata offset which the broker has reached." },protocol." }
      ]
    { "name": "ListenersFeatures", "type": "[]ListenerFeature", "nullableVersions": "0+",
      "about": "The listenersfeatures ofon this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The feature name of the endpoint." },
        { "name": "HostMinSupportedVersion", "type": "stringint16", "versions": "0+",
          "about": "The hostnameminimum supported feature level." },
        { "name": "PortMaxSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The port maximum supported feature level." },
      ]
    },
    { "name": "SecurityProtocolRack", "type": "int16string", "versions": "0+",
    "nullableVersions": "0+",
      "about": "The security protocolrack which this broker is in." }
      ]
    {
}

{
  "nameapiKey": "Features"50,
  "type": "[]Featureresponse",
  "nullableVersionsname": "0+BrokerHeartbeatResponse",
      "aboutvalidVersions": "The features on this broker0",
  "versionsflexibleVersions": "0+",
  "fields": [
        { "name": "NameThrottleTimeMs", "type": "stringint32", "versions": "0+",
      "mapKeyabout": true,
          "about": "The feature name "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
        { "name": "MinSupportedVersionErrorCode", "type": "int16", "versions": "0+",
          "about": "The minimum supported feature level error code, or 0 if there was no error." },
        { "name": "MaxSupportedVersionActiveControllerId", "type": "int16int32", "versions": "0+",
          "about": "The maximumID supported feature level." }
      ]
   of the active controller, or -1 if the controller doesn't know." },
    { "name": "RackNextState", "type": "stringint8", "versions": "0+", "nullableVersions": "0+",
      "about": "The state rackto which thisthe broker isshould intransition." },
  ]
}

{
  "apiKey  { "name": 50"BrokerEpoch",
  "type": "responseint64",
  "nameversions": "BrokerHeartbeatResponse0+",
  "validVersionsdefault": "0-1",
      "flexibleVersionsabout": "0+",
  "fields": [The broker's assigned epoch, or -1 if none was assigned." },
    { "name": "ThrottleTimeMsLeaseDurationMs", "type": "int32int64", "versions": "0+",
      "about": "DurationIf inBrokerEpoch millisecondsis fornot which-1, the requestnumber wasof throttledmilliseconds duethat towe awant quotathe violation,lease orto zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "ActiveControllerId", "type": "int32", "versions": "0+",
      "about": "The ID of the active controller, or -1 if the controller doesn't know." },
    { "name": "NextState", "type": "int8", "versions": "0+",
      "about": "The state to which the broker should transition." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker's assigned epoch, or -1 if none was assigned." },
    last." }
  ]
}

enum BrokerState {
    UNKNOWN(0),
    INITIAL(1),
    FENCED(2),
    ACTIVE(3),
    SHUTDOWN(4);
}

The controller will wait to unfence a broker until it has sent at least one heartbeat where that broker's currentState is active.  So a typical transition will look like this:

  • broker sends a BrokerHeartbeatRequest with currentState = fenced, targetState = active.  controller response with nextState = active
  • broker sends a BrokerHeartbeatRequest with currentState = active, targetState = active.  controller response with nextState = active
  • controller unfences the broker

The second step informs the controller that the broker has received its response to the first step.  In order to speed this process along, the broker will send the second heartbeat immediately after receiving the first, rather than waiting for the heartbeat timeout.

As always with enums, the UNKNOWN state is used only to translate values that our software is too old to understand.

The controller will return NOT_CONTROLLER if it is not active.  Brokers will always return NOT_CONTROLLER for these RPCs.

Heartbeat requests that give a CurMetadataOffset which is lower than the previous one recorded for that broker will be ignored.

Record Formats

BrokerRecord

Code Block
{
  "apiKey": 0,
  "type": "metadata",
  "name": "BrokerRecord",
  "validVersions": "0",
  "fields": [
	{ "name": "LeaseDurationMsId", "type": "int64int32", "versions": "0+",
 	     "about": "IfThe BrokerEpoch is not -1, the number of milliseconds that we want the lease to last." }
  ]
}

enum BrokerState {
    UNKNOWN(0),
    INITIAL(1),
    FENCED(2),
    ACTIVE(3),
    SHUTDOWN(4);
}

The controller will wait to unfence a broker until it has sent at least one heartbeat where that broker's currentState is active.  So a typical transition will look like this:

  • broker sends a BrokerHeartbeatRequest with currentState = fenced, targetState = active.  controller response with nextState = active
  • broker sends a BrokerHeartbeatRequest with currentState = active, targetState = active.  controller response with nextState = active
  • controller unfences the broker

The second step informs the controller that the broker has received its response to the first step.  In order to speed this process along, the broker will send the second heartbeat immediately after receiving the first, rather than waiting for the heartbeat timeout.

As always with enums, the UNKNOWN state is used only to translate values that our software is too old to understand.

The controller will return NOT_CONTROLLER if it is not active.  Brokers will always return NOT_CONTROLLER for these RPCs.

Heartbeat requests that give a CurMetadataOffset which is lower than the previous one recorded for that broker will be ignored.

Record Formats

BrokerRecord

Code Block
{
  "apiKey": 0,
  "type": "metadata",
  "name": "BrokerRecord",
  "validVersions": "0",
  "fields": [
broker id." },
	{ "name": "Epoch", "type": "int64", "versions": "0+",
	  "about": "The broker epoch." },
	{ "name": "EndPoints", "type": "[]BrokerEndpoint", "versions": "0+", "nullableVersions": "0+",
	  "about": "The endpoints that can be used to communicate with this broker.", "fields": [
		{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
		  "about": "The name of the endpoint." },
		{ "name": "Host", "type": "string", "versions": "0+",
		  "about": "The hostname." },
		{ "name": "IdPort", "type": "int32int16", "versions": "0+",
		  "about": "The broker idport." },
		{ "name": "EpochSecurityProtocol", "type": "int64int16", "versions": "0+",
		  "about": "The brokersecurity epochprotocol." }
	]},
	{ "name": "EndPointsFeatures", "type": "[]BrokerEndpointBrokerFeature", "versions": "0+", "nullableVersions": "0+",
	  "about": "The endpointsfeatures that can be used to communicate with this broker supports.", "fields": [
		{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
		  "about": "The name of the endpointfeature." },
		{ "name": "HostMinVersion", "type": "stringint16", "versions": "0+",
		  "about": "The hostname." },
		{ "name": "Port", "type": "int16", "versions": "0+",
		  "about": "The portminimum feature level that this broker supports." },
		{ "name": "SecurityProtocolMaxVersion", "type": "int16", "versions": "0+",
		  "about": "The security protocol maximum feature level that this broker supports." }
	]},
	{ "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
	  "about": "The broker rack." }
  ]
}

...

Code Block
{
  "apiKey": 10,
  "type": "metadata",
  "name": "FeatureLevelRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
      "about": "The feature name." },
    { "name": "MinFeatureLevel", "type": "int16", "versions": "0+",
      "about": "The current finalized minimum feature level of this feature for the cluster." },
    { "name": "FinalizedLevelMaxFeatureLevel", "type": "int16", "versions": "0+",
      "about": "The current finalized maximum feature level of this feature for the cluster." }
  ]
} 

...

Full NameDescription

kafka.controller:type=KafkaController,name=MetadataLag

The offset delta between the latest metadata record this controller has replayed and the last stable offset of the metadata topic.

kafka.controller:type=KafkaServer,name=MetadataLagThe offset delta between the latest metadata record this broker has replayed and the last stable offset of the metadata topic.
kafka.controller:type=KafkaController,name=MetadataCommitLatencyMsThe latency of committing a message to the metadata topic.  Relevant on the active controller.
kafka.controller:type=KafkaController,name=MetadataCommitRateThe number of metadata messages per second committed to the metadata topic.
kafka.controller:type=KafkaController,name=MetadataSnapshotLag

New name for kafka.controller:type=KafkaController,name=SnapshotLag

The offset delta between the latest stable offset of the metadata topic and the offset of the last snapshot (or the last stable offset itself, if there are no snapshots)

Unused Metrics in KIP-500 Mode

...