Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
$ cat /tmp/kafka-logs/meta.properties
#
#Tue Dec 01 10:08:08 PST 2020
cluster.id=3Db5QLSqSZieL3rJBUUegA
version=1
broker.id=0

...

kafka-storage.sh

There will be a new command-line tool, kafka-storage.sh.

...

Code Block
$ ./bin/kafka-storage.sh random-uuid
51380268-1036-410d-a8fc-fb3b55f48033

Configurations

...

null

broker

controller

broker,controller

...

If this is null (absent) then we are in legacy mode.

Otherwise, we are in KIP-500 mode and this configuration determines what roles this process should play: broker, controller, or both.

...

If non-null, this must be a comma-separated list of listener names.

When communicating with the controller quorum, the broker will always use the first listener in this list.

...

A comma-separated list of the names of the listeners used by the KIP-500 controller. This configuration is required if this process is a KIP-500 controller. The legacy controller will not use this configuration

Despite the similar name, note that this is different from the "control plane listener" introduced by KIP-291.

...

A comma-separated list of the configured listeners.  For example,

INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094

...

If non-null, this must be a comma-separated list of all the controller voters, in the format:

{controller-id}@{controller-host):{controller-port}

...

When in KIP-500 mode, each node must have this configuration, in order to find out how to communicate with the controller quorum.

Note that this replaces the "quorum.voters" config described in KIP-595.

This configuration is required for both brokers and controllers.

...

a 32-bit ID

...

The controller id for this server.  Only required if this server is a controller.

...

controller.quorum.election.timeout.ms

...

kafka-cluster.sh

There will also be new command-line tool named kafka-cluster.sh.

Code Block
$ ./bin/kafka-cluster.sh -h
usage: kafka-cluster [-h] {id,decommission} ...

The Kafka cluster tool.

positional arguments:
  {cluster-id,decommission}
    cluster-id           Get information about the ID of a cluster.
    decommission         Decommission a broker ID.

optional arguments:
  -h, --help             show this help message and exit

kafka-storage.sh will have two subcommands: id and decommission.

cluster-id

Code Block
$ ./bin/kafka-cluster.sh cluster-id -h
usage: kafka-cluster cluster-id [-b,-c,-h]

optional arguments:
  -b, --bootstrap-server a list of host/port pairs to use for establishing the connection to the kafka cluster.
  -c, --config           a property file containing configs to be passed to Admin Client.
  -h, --help             show this help message and exit

The ID command prints out the cluster id

decommission

Code Block
$ ./bin/kafka-cluster.sh decommission -h
usage: kafka-cluster decommission [-b,-c,-h,-i]

optional arguments:
  -b, --bootstrap-server a list of host/port pairs to use for establishing the connection to the kafka cluster.
  -c, --config           a property file containing configs to be passed to Admin Client.
  -f, --force            true if we should force the decommissioning even if this broker is the only replica for some partitions.
  -h, --help             show this help message and exit
  -i, --id               the ID of the broker to decommission

The decommission command removes the registration of a specific broker ID.  Note that this will also remove the broker ID from all replica assignments.

Configurations

Configuration NamePossible ValuesNotes
process.roles

null

broker

controller

broker,controller

If this is null (absent) then we are in legacy mode.

Otherwise, we are in KIP-500 mode and this configuration determines what roles this process should play: broker, controller, or both.

controller.listener.names

If non-null, this must be a comma-separated list of listener names.

When communicating with the controller quorum, the broker will always use the first listener in this list.

A comma-separated list of the names of the listeners used by the KIP-500 controller. This configuration is required if this process is a KIP-500 controller. The legacy controller will not use this configuration

Despite the similar name, note that this is different from the "control plane listener" introduced by KIP-291.

listeners

A comma-separated list of the configured listeners.  For example,

INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094

This configuration is now required.
controller.connect

If non-null, this must be a comma-separated list of all the controller voters, in the format:

{controller-id}@{controller-host):{controller-port}

When in KIP-500 mode, each node must have this configuration, in order to find out how to communicate with the controller quorum.

Note that this replaces the "quorum.voters" config described in KIP-595.

This configuration is required for both brokers and controllers.

controller.id

a 32-bit ID

The controller id for this server.  Only required if this server is a controller.

broker.ida 32-bit IDThe broker id for this server.  Only required if this server is a broker.
initial.broker.registration.timeout.ms60000When initially registering with the controller quorum, the number of milliseconds to wait before declaring failure and exiting the broker process.
broker.heartbeat.interval.ms3000The length of time between broker heartbeats.
broker.registration.timeout.ms18000The length of time that a broker lease lasts if no heartbeats are made.
metadata.log.dirIf set, this must be a path to a log directory.This configuration determines where we put the metadata log.  if it is not set, the metadata log is placed in the first log directory from log.dirs.
controller.quorum.fetch.timeout.msMaximum time without a successful fetch from the current leader before a new election is started.New name for quorum.fetch.timeout.ms

controller.quorum.election.timeout.ms

Maximum time without collected a majority of votes during the candidate state before a new election is retriedNew name for quorum.election.timeout.ms
controller.quorum.election.backoff.max.msMaximum exponential backoff time (based on the number if retries) after an election timeout, before a new election is triggered.New name for quorum.election.backoff.max.ms
controller.quorum.request.timeout.msMaximum time before a pending request is considered failed and the connection is droppedNew name for quorum.request.timeout.ms
controller.quorum.retry.backoff.msInitial delay between request retries. This config and the one below is used for retriable request errors or lost connectivity and are different from the election.backoff configs aboveNew name for quorum.retry.backoff.ms
controller.quorum.retry.backoff.max.msMax delay between requests. Backoff will increase exponentially beginning from quorum.retry.backoff.msNew name for quorum.retry.backoff.max.ms

Deprecated Configurations

Configuration NameReason
control.plane.listener.nameWe no longer need to maintain a separate listener for messages from the controller, since the controller does not send messages out any more (it receives them).
broker.id.generation.enableAutomatic broker ID generation is no longer supported.
zookeeper.*We no longer need configurations for ZooKeeper.

New Error Codes

DUPLICATE_BROKER_REGISTRATION

There will be a new error code, DUPLICATE_BROKER_REGISTRATION, that the active controller will return when a broker tries to register with an ID that is currently in use.

INVALID_CLUSTER_ID

There will be a new error code, INVALID_CLUSTER_ID, that the controller will return if the broker tries to register with the wrong cluster ID.

RPCs

Obsoleting the Metadata Propagation RPCs

As discussed earlier, the new controller will use FetchRequest to fetch metadata from the active controller.  The details of how Raft fetching will work are spelled out in KIP-595: A Raft Protocol for the Metadata Quorum.

Since we propagate the metadata via Raft, we will no longer need to send out LeaderAndIsrRequest, UpdateMetadataRequest, and StopReplicaRequest.  These requests will be sent out only when we're in legacy mode, not when we're in KIP-500 mode.  Eventually we will add some support for these requests to the new controller, in order to support rolling upgrade from a pre-KIP-500 release. However, for the purpose of this KIP, the new controller will not use these requests.

Obsoleting the Controlled Shutdown RPC

The broker heartbeat mechanism replaces the controlled shutdown RPC.  Therefore, we will not need to support the this RPC any more in the controller-- except for compatibility during upgrades, which will be described further in a follow-on KIP.

Topic Identifiers

This KIP builds on top of the work done in KIP-516 to improve how partitions are tracked.

Because each topic is identified by a unique topic UUID, we can implement topic deletion with a single record, RemoveTopicRecord.  Upon replaying this record, each broker will delete the associated topic if it is present.

Of course, some brokers may be down when the topic is deleted.  In fact, some brokers may never see the RemoveTopicRecord.  This record may get collapsed into one of the periodic metadata snapshots.  If this happens, the record will be reflected in the snapshot through the absence of a broker record, not its presence.  Therefore, during the startup process, brokers must compare the log directories that they have with the ones contained in the latest metadata.  The appropriate time to do this is at the start of the RECOVERY phase.  At this point, the broker has the latest metadata.

BrokerRegistration

Code Block
languagejs
{
  "apiKey": 57,
  "type": "request",
  "name": "BrokerRegistrationRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID." },
	{ "name": "ClusterId", "type": "uuid", "versions": "0+",
	  "about": "The cluster id of the broker process." },
	{ "name": "IncarnationId", "type": "uuid", "versions": "0+",
	  "about": "The incarnation id of the broker process." },
    { "name": "CurMetadataOffset", "type": "int64", "versions": "0+",
      "about": "The highest metadata offset which the broker has reached." },
    { "name": "Listeners", "type": "[]Listener",
      "about": "The listeners of this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname." },
    

Deprecated Configurations

...

New Error Codes

DUPLICATE_BROKER_REGISTRATION

There will be a new error code, DUPLICATE_BROKER_REGISTRATION, that the active controller will return when a broker tries to register with an ID that is currently in use.

INVALID_CLUSTER_ID

There will be a new error code, INVALID_CLUSTER_ID, that the controller will return if the broker tries to register with the wrong cluster ID.

RPCs

Obsoleting the Metadata Propagation RPCs

As discussed earlier, the new controller will use FetchRequest to fetch metadata from the active controller.  The details of how Raft fetching will work are spelled out in KIP-595: A Raft Protocol for the Metadata Quorum.

Since we propagate the metadata via Raft, we will no longer need to send out LeaderAndIsrRequest, UpdateMetadataRequest, and StopReplicaRequest.  These requests will be sent out only when we're in legacy mode, not when we're in KIP-500 mode.  Eventually we will add some support for these requests to the new controller, in order to support rolling upgrade from a pre-KIP-500 release. However, for the purpose of this KIP, the new controller will not use these requests.

Obsoleting the Controlled Shutdown RPC

The broker heartbeat mechanism replaces the controlled shutdown RPC.  Therefore, we will not need to support the this RPC any more in the controller-- except for compatibility during upgrades, which will be described further in a follow-on KIP.

Topic Identifiers

This KIP builds on top of the work done in KIP-516 to improve how partitions are tracked.

Because each topic is identified by a unique topic UUID, we can implement topic deletion with a single record, RemoveTopicRecord.  Upon replaying this record, each broker will delete the associated topic if it is present.

Of course, some brokers may be down when the topic is deleted.  In fact, some brokers may never see the RemoveTopicRecord.  This record may get collapsed into one of the periodic metadata snapshots.  If this happens, the record will be reflected in the snapshot through the absence of a broker record, not its presence.  Therefore, during the startup process, brokers must compare the log directories that they have with the ones contained in the latest metadata.  The appropriate time to do this is at the start of the RECOVERY phase.  At this point, the broker has the latest metadata.

BrokerRegistration

Code Block
languagejs
{
  "apiKey": 57,
  "type": "request",
  "name": "BrokerRegistrationRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerIdPort", "type": "int32int16", "versions": "0+",
          "about": "The broker IDport." },
	        { "name": "ClusterIdSecurityProtocol", "type": "uuidint16", "versions": "0+",
	          "about": "The cluster id of the broker process." },
	 security protocol." }
      ]
    { "name": "IncarnationIdFeatures", "type": "uuid[]Feature", "versions": "0+",
	
      "about": "The incarnationfeatures idon of thethis broker", process."versions": },
"0+", "fields": [
        { "name": "CurMetadataOffsetName", "type": "int64string", "versions": "0+", "mapKey": true,
          "about": "The highestfeature metadata offset which the broker has reached." },
    { "namename." }
        { "name": "MinSupportedVersion", "type": "Listenersint16", "typeversions": "[]Listener0+",
          "about": "The listenersminimum ofsupported thisfeature brokerlevel.", "versions": "0+", "fields": [ },
        { "name": "NameMaxSupportedVersion", "type": "stringint16", "versions": "0+", "mapKey": true,
          "about": "The namemaximum ofsupported thefeature endpointlevel." },
      ]
    },
    { "name": "HostRack", "type": "string", "versions": "0+",
    "nullableVersions": "0+",
      "about": "The hostname rack which this broker is in." },
  ]
}

{
  "apiKey": 57,
   { "nametype": "Portresponse",
  "typename": "int16BrokerRegistrationResponse",
  "versionsvalidVersions": "0+",
          "aboutflexibleVersions": "The port." }0+",
    "fields": [
    { "name": "SecurityProtocolThrottleTimeMs", "type": "int16int32", "versions": "0+",
          "about": "TheDuration securityin protocol." }
      ]
    { "name": "Features", "type": "[]Feature",
      "about": "The features on this broker", "versions": "0+", "fields": [
    milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "NameErrorCode", "type": "stringint16", "versions": "0+", "mapKey": true,
          "about": "The feature name error code, or 0 if there was no error." },
        { "name": "MinSupportedVersionBrokerEpoch", "type": "int16int64", "versions": "0+", "default": "-1",
      "about": "The broker's assigned epoch, or -1 if none was assigned." }
  ]
}

BrokerHeartbeat

As described earlier, the broker periodically sends out a heartbeat request to the active controller.

Code Block
languagejs
{
     "apiKey": 58,
  "abouttype": "The minimum supported feature level." },
        { request",
  "name": "MaxSupportedVersionBrokerHeartbeatRequest",
  "typevalidVersions": "int160",
  "versionsflexibleVersions": "0+",
          "about": "The maximum supported feature level." }
      ]
    },"fields": [
    { "name": "RackBrokerId", "type": "stringint32", "versions": "0+", "nullableVersions": "0+",
      "about": "The rack which this broker is inID." },
  ]
}

{
  "apiKey  { "name": 57"BrokerEpoch",
  "type": "responseint64",
  "nameversions": "BrokerRegistrationResponse0+",
  "validVersionsdefault": "0-1",
      "flexibleVersionsabout": "0+",
The broker "fieldsepoch.": [},
    { "name": "ThrottleTimeMsCurrentMetadataOffset", "type": "int32int64", "versions": "0+",
      "about": "DurationOne in milliseconds for whichmore than the requesthighest wasmetadata throttled due to a quota violation, or zero ifoffset which the requestbroker did not violate any quotahas reached." },
    { "name": "ErrorCodeShouldFence", "type": "int16bool", "versions": "0+",
      "about": "TheTrue errorif code,the orbroker 0wants ifto therebe wasfenced, nofalse errorotherwise." },
    { "name": "BrokerEpochShouldShutDown", "type": "int64", "versions": "0+bool", "defaultversions": "-10+",
      "about": "TheTrue broker'sif assignedthe epoch,broker orwants -1to ifinitiate nonecontrolled was assignedshutdown." }
  ]
}

BrokerHeartbeat

As described earlier, the broker periodically sends out a heartbeat request to the active controller.

Code Block
languagejs
   

{
  "apiKey": 58,
  "type": "requestresponse",
  "name": "BrokerHeartbeatRequestBrokerHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "BrokerEpochErrorCode", "type": "int64int16", "versions": "0+", "default": "-1",
      "about": "The broker epoch error code, or 0 if there was no error." },
    { "name": "CurrentMetadataOffsetIsCaughtUp", "type": "int64bool", "versions": "0+",
      "about": "OneTrue if morethe thanbroker thehas highestapproximately metadatacaught offsetup whichwith the brokerlatest has reachedmetadata." },
    { "name": "ShouldFenceIsFenced", "type": "bool", "versions": "0+",
      "about": "True if the broker wants to beis fenced, false otherwise." },
    { "name": "ShouldShutDownControlledShutdownOk", "type": "bool", "versions": "0+",
      "about": "True if the broker wantscan toexecute initiatea controlled shutdown now." }
  ]
}   

  ]
}

The controller will wait to unfence a broker until it sends a heartbeat where ShouldFence is false and CurrentMetadataOffset is caught up.

If the heartbeat request has ShouldShutDown set, the controller will try to move all the leaders off of the broker.

The controller will set ControlledShutdownOk if the broker is cleared to execute a controlled shutdown.  In other words, if it has no leaderships.

The controller will return NOT_CONTROLLER if it is not active.  Brokers will always return NOT_CONTROLLER for these RPCs.

DecommissionBroker

The DecomissionBrokerRequest asks the controller to unregister a broker from the cluster.

Code Block
languagejs
{
  "apiKey": 5859,
  "type": "responserequest",
  "name": "BrokerHeartbeatResponseDecommissionBrokerRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, "fields": [
    { "name": "ErrorCodeBrokerId", "type": "int16int32", "versions": "0+",
      "about": "The errorbroker code,ID or 0 if there was no errorto decommission." },
    { "name": "IsCaughtUpForce", "type": "bool", "versions": "0+", "default": "false",
      "about": "True if we should force the decommissioning even if this broker has approximately caught up with the latest metadata." }, is the only replica for some partitions." }
  ]
}   

{
  "apiKey": 59,
  "type": "response",
  "name": "DecommissionBrokerResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "IsFencedThrottleTimeMs", "type": "boolint32", "versions": "0+",
      "about": "TrueDuration in milliseconds for which the request was throttled due to a quota violation, or zero if the broker is fenced request did not violate any quota." },
    { "name": "ControlledShutdownOkErrorCode", "type": "boolint16", "versions": "0+",
      "about": "TrueThe iferror thecode, brokeror can0 executeif athere controlledwas shutdownno nowerror." },
  ]
}

The controller will wait to unfence a broker until it sends a heartbeat where ShouldFence is false and CurrentMetadataOffset is caught up.

If the heartbeat request has ShouldShutDown set, the controller will try to move all the leaders off of the broker.

The controller will set ControlledShutdownOk if the broker is cleared to execute a controlled shutdown.  In other words, if it has no leaderships.

...

valid response codes are:

  • NONE if the decommissioning succeeded
  • NOT_CONTROLLER if the node that the request was sent to is not the controller
  • UNSUPPORTED_VERSION if KIP-500 mode is not enabled
  • BROKER_NOT_AVAILABLE if the given broker ID is not registered.
  • BROKER_REQUIRED_FOR_PARTITIONS if the given broker is the last replica for at least one partition, and force was not specified.

Record Formats

RegisterBrokerRecord

...