Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

As described in KIP-500, the controller will store its data in the internal @metadata topic__cluster_metadata topic.  This topic will contain a single partition which is managed by Raft, as described in KIP-595: A Raft Protocol for the Metadata Quorum.

...

Metadata changes need to be persisted to the @metadata __cluster_metadata log before we apply them on the other nodes in the cluster.  This means waiting for the metadata log's last stable offset to advance to the offset of the change.  After that point, we are guaranteed not to lose the change as long as we uphold the Raft invariants.

...

However, this "future state" may never be committed.  For example, the active controller might fail, truncating some of its future state.  Therefore, the active controller must not make this future state "visible" to the rest of the cluster until it has been made persistent – that is, until it becomes current state.  In the case of the @metadata topic__cluster_metadata topic, the replication protocol itself neatly takes care of this for us.  In the case of controller RPCs like AlterIsr, the controller handles this by not sending back a response until the designated change has been persisted.

...

The main change in the broker state machine is that the RECOVERING_FROM_UNCLEAN_SHUTDOWN state has been renamed to RECOVERY.  Also, unlike previously, the broker will always pass through RECOVERY (although it may only stay in this state for a very short amount of time).

Public Interfaces

meta.properties

When a storage directory is in use by a cluster running in kip-500 mode, it will have a new version of the meta.properties file.  Since the current version is 0, the new version will be 1.   Just as in version 0, meta.properties will continue to be a Java Properties file.  This essentially means that it is a plain text file where each line has the format key=value.

In version 0 of meta.properties, the cluster.id field is optional.  In contrast, in version 1 it is mandatory.

In version 0 of meta.properties, the cluster.id field is serialized in hexadecimal.  In contrast, in version 1 it is serialized in base64.

Version 1 of meta.properties replaces the broker.id field with node.id.

For servers running in kip-500 mode, the `meta.properties` file must be present in every log directory. The process will raise an error during startup if if either the meta.properties file does not exist or if the node.id found does not match what the value from the configuration file.

Here is an example of a version 1 meta.properties file:

Code Block
$ cat /tmp/kafka-logs/meta.properties
#
#Tue Dec 01 10:08:08 PST 2020
cluster.id=3Db5QLSqSZieL3rJBUUegA
version=1
node.id=0

kafka-storage.sh

There will be a new command-line tool, kafka-storage.sh.

Partition Reassignment

Just like the ZooKeeper-based controller, the quorum controller will implement the KIP-455 partition reassignment API. This API specifies both alter and list operations.

The alter operation specifies a partition, plus a target replica list. If the target replica list is a subset of the partition's current replica list, and the new replica list contains at least one member of the current ISR, then the reassignment can be completed immediately. In this case, the controller issues a PartitionChangeRecord changing the replica set and ISR (if appropriate).

On the other hand, if the new replica list has additional replicas, or specifies a subset of replicas that doesn't intersect with the current ISR, then we add the additional replicas to the replica list, and wait for the ISR to catch up.

So, for example, if the old replica list was [1, 2, 3], and the new list was [3, 4, 5], we would proceed as follows:

  • change the replica list to [1, 2, 3, 4, 5], and set addingReplicas to [4, 5]
  • Wait for an alterIsr which adds both broker 4 and 5
  • Change the replica list to [3, 4, 5 ] and prune the ISR accordingly. Remove addingReplicas.

Just like with the ZooKeeper-based controller, the new controller stores reassignments as "hard state." Reassignments will continue even after a controller failover or broker shutdown and restart. Reassignment state appears in controller snapshots, as addingReplicas and removingReplicas.

Unlike the old controller, the quorum controller does not prevent topics that are undergoing reassignment from being deleted. If a topic is deleted, then any partition reassignments that it had are terminated.

Typically, reassignments which require ISR changes are completed by a leader adding some new replicas, via the alterIsr RPC. (Note that another possible way for a reassignment to be completed is via a broker being added to a partition ISR during unfencing). 

When a broker makes an alterIsr change that completes a reassignment, the resulting ISR will be different than requested. For example, if the leader is 1, and the current ISR is 1, 2, 3, and the target replica set is 1, 2, 4, when the leader tries to change the ISR to 1, 2, 3, 4, the controller will change it to 1, 2, 4 instead, completing the reassignment. Since the alterIsr response returns the actual ISR which was actually applied, the leader will apply this new ISR instead.

If the new ISR would does not contain the leader which made the alterIsr call, the controller returns FENCED_LEADER_EPOCH. This will notify the broker that it should wait until it can replay the latest partition change from the log. That partition change record will make this broker a follower rather than a leader.

Public Interfaces

meta.properties

When a storage directory is in use by a cluster running in kip-500 mode, it will have a new version of the meta.properties file.  Since the current version is 0, the new version will be 1.   Just as in version 0, meta.properties will continue to be a Java Properties file.  This essentially means that it is a plain text file where each line has the format key=value.

In version 0 of meta.properties, the cluster.id field is optional.  In contrast, in version 1 it is mandatory.

In version 0 of meta.properties, the cluster.id field is serialized in hexadecimal.  In contrast, in version 1 it is serialized in base64.

Version 1 of meta.properties replaces the broker.id field with node.id.

For servers running in kip-500 mode, the `meta.properties` file must be present in every log directory. The process will raise an error during startup if if either the meta.properties file does not exist or if the node.id found does not match what the value from the configuration file.

Here is an example of a version 1 meta.properties file:

Code Block
$ cat /tmp/kafka-logs/meta.properties
#
#Tue Dec 01 10:08:08 PST 2020
cluster.id=3Db5QLSqSZieL3rJBUUegA
version=1
node.id=0

kafka-storage.sh

There will be a new command-line tool, kafka-storage.sh.

Code Block
$ ./bin/kafka-storage.sh -h
usage: kafka-storage [-h] {info,format,random-uuid
Code Block
$ ./bin/kafka-storage.sh -h
usage: kafka-storage [-h] {info,format,random-uuid} ...

The Kafka storage tool.

positional arguments:
  {info,format,random-uuid}
    info                 Get information about the Kafka log directories on this node.
    format               Format the Kafka log directories on this node.
    random-uuid          Print a random UUID.

optional arguments:
  -h, --help             show this help message and exit

...

The --skip-record-metadata flag will skip printing metadata for each record.  However, metadata for each record batch will still be printed when this flag is present.

kafka-metadata-shell.sh

The Kafka Metadata shell is a new command which allows users to interactively examine the metadata stored in a KIP-500 cluster.

It can read the metadata from the controllers directly, by connecting to them, or from a metadata snapshot on disk.  In the former case, the quorum voters must be specified by passing the --controllers flag; in the latter case, the snapshot file should be specified via --snapshot.

Code Block
$ ./bin/kafka-metadata-shell.sh -h
usage: metadata-tool [-h] [--controllers CONTROLLERS] [--config CONFIG] [--snapshot SNAPSHOT] [command [command ...]]

The Apache Kafka metadata tool

positional arguments:
  command                The command to run.

optional arguments:
  -h, --help             show this help message and exit
  --controllers CONTROLLERS, -C CONTROLLERS
                         The quorum voter connection string to use.
  --config CONFIG, -c CONFIG
                         The configuration file to use.
  --snapshot SNAPSHOT, -s SNAPSHOT
                         The snapshot file to read.

...

Code Block
>> help
Welcome to the Apache Kafka metadata shell.

usage:  {cat,cd,exit,find,help,history,ls,man,pwd} ...

positional arguments:
  {cat,cd,exit,find,help,history,ls,man,pwd}
    cat                  Show the contents of metadata nodes.
    cd                   Set the current working directory.
    exit                 Exit the metadata shell.
.
    find                 Search for nodes in the directory hierarchy.
    help                 Display this help message.
    history              findPrint command history.
    ls           Search for nodes in the directory hierarchy.
  List metadata helpnodes.
    man             Display this help message.
  Show the historyhelp text for a specific command.
    pwd     Print command history.
    ls       Print the current          List metadata nodes.
    man                  Show the help text for a specific command.
    pwd                  Print the current working directory.

The interface of the metadata tool is currently considered unstable and may change when KIP-500 becomes production-ready.

Configurations

working directory.

The interface of the metadata tool is currently considered unstable and may change when KIP-500 becomes production-ready.

Configurations

Configuration NamePossible ValuesNotes
process.roles

null

broker

controller

broker,controller

If this is null (absent) then we are in legacy mode.

Otherwise, we are in KIP-500 mode and this configuration determines what roles this process should play: broker, controller, or both.

controller.listener.names

If non-null, this must be a comma-separated list of listener names.

When communicating with the controller quorum, the broker will always use the first listener in this list.

A comma-separated list of the names of the listeners used by the KIP-500 controller. This configuration is required if this process is a KIP-500 controller. The legacy controller will not use this configuration

Despite the similar name, note that this is different from the "control plane listener" introduced by KIP-291.

listeners

A comma-separated list of the configured listeners.  For example,

INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094

This configuration is now required.
sasl.mechanism.controller.protocolSASL mechanism used for communication with controllers. Default is GSSAPI.This is analogous to sasl.mechanism.inter.broker.protocol, but for communication with the controllers.
controller.quorum.voters
Configuration NamePossible ValuesNotes
process.roles

null

broker

controller

broker,controller

If this is null (absent) then we are in legacy mode.

Otherwise, we are in KIP-500 mode and this configuration determines what roles this process should play: broker, controller, or both.

controller.listener.names

If non-null, this must be a comma-separated list of listener names.When communicating with all the controller quorumvoters, the broker will always use the first listener in this list.

A comma-separated list of the names of the listeners used by the KIP-500 controller. This configuration is required if this process is a KIP-500 controller. The legacy controller will not use this configuration

Despite the similar name, note that this is different from the "control plane listener" introduced by KIP-291.

listeners

A comma-separated list of the configured listeners.  For example,

INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094

This configuration is now required.
sasl.mechanism.controller.protocolSASL mechanism used for communication with controllers. Default is GSSAPI.This is analogous to sasl.mechanism.inter.broker.protocol, but for communication with the controllers.
controller.quorum.voters

If non-null, this must be a comma-separated list of all the controller voters, in the format:

{controller-id}@{controller-host):{controller-port}

When in KIP-500 mode, each node must have this configuration, in order to find out how to communicate with the controller quorum.

Note that this replaces the "quorum.voters" config described in KIP-595.

This configuration is required for both brokers and controllers.

node.ida 32-bit ID

This configuration replaces `broker.id` for zk-based Kafka processes in order to reflect its more general usage. It serves as the ID associated with each role that the process is acting as.

For example, a configuration with `node.id=0` and `process.roles=broker,controller` defines two nodes: `broker-0` and `controller-0`.

initial.broker.registration.timeout.ms60000When initially registering with the controller quorum, the number of milliseconds to wait before declaring failure and exiting the broker process.
broker.heartbeat.interval.ms3000The length of time between broker heartbeats.
broker.session.timeout.ms18000The length of time that a broker lease lasts if no heartbeats are made.
metadata.log.dirIf set, this must be a path to a log directory.This configuration determines where we put the metadata log.  if it is not set, the metadata log is placed in the first log directory from log.dirs.
controller.quorum.fetch.timeout.msMaximum time without a successful fetch from the current leader before a new election is started.New name for quorum.fetch.timeout.ms

controller.quorum.election.timeout.ms

Maximum time without collected a majority of votes during the candidate state before a new election is retriedNew name for quorum.election.timeout.ms
controller.quorum.election.backoff.max.msMaximum exponential backoff time (based on the number if retries) after an election timeout, before a new election is triggered.New name for quorum.election.backoff.max.ms
controller.quorum.request.timeout.msMaximum time before a pending request is considered failed and the connection is droppedNew name for quorum.request.timeout.ms
controller.quorum.retry.backoff.msInitial delay between request retries. This config and the one below is used for retriable request errors or lost connectivity and are different from the election.backoff configs aboveNew name for quorum.retry.backoff.ms
controller.quorum.retry.backoff.max.msMax delay between requests. Backoff will increase exponentially beginning from quorum.retry.backoff.msNew name for quorum.retry.backoff.max.ms

Deprecated Configurations

...

in the format:

{controller-id}@{controller-host):{controller-port}

When in KIP-500 mode, each node must have this configuration, in order to find out how to communicate with the controller quorum.

Note that this replaces the "quorum.voters" config described in KIP-595.

This configuration is required for both brokers and controllers.

node.ida 32-bit ID

This configuration replaces `broker.id` for zk-based Kafka processes in order to reflect its more general usage. It serves as the ID associated with each role that the process is acting as.

For example, a configuration with `node.id=0` and `process.roles=broker,controller` defines two nodes: `broker-0` and `controller-0`.

initial.broker.registration.timeout.ms60000When initially registering with the controller quorum, the number of milliseconds to wait before declaring failure and exiting the broker process.
broker.heartbeat.interval.ms3000The length of time between broker heartbeats.
broker.session.timeout.ms18000The length of time that a broker lease lasts if no heartbeats are made.
metadata.log.dirIf set, this must be a path to a log directory.This configuration determines where we put the metadata log.  if it is not set, the metadata log is placed in the first log directory from log.dirs.
controller.quorum.fetch.timeout.msMaximum time without a successful fetch from the current leader before a new election is started.New name for quorum.fetch.timeout.ms

controller.quorum.election.timeout.ms

Maximum time without collected a majority of votes during the candidate state before a new election is retriedNew name for quorum.election.timeout.ms
controller.quorum.election.backoff.max.msMaximum exponential backoff time (based on the number if retries) after an election timeout, before a new election is triggered.New name for quorum.election.backoff.max.ms
controller.quorum.request.timeout.msMaximum time before a pending request is considered failed and the connection is droppedNew name for quorum.request.timeout.ms
controller.quorum.retry.backoff.msInitial delay between request retries. This config and the one below is used for retriable request errors or lost connectivity and are different from the election.backoff configs aboveNew name for quorum.retry.backoff.ms
controller.quorum.retry.backoff.max.msMax delay between requests. Backoff will increase exponentially beginning from quorum.retry.backoff.msNew name for quorum.retry.backoff.max.ms

Deprecated Configurations

Configuration NameReason
control.plane.listener.nameWe no longer need to maintain a separate listener for messages from the controller, since the controller does not send messages out any more (it receives them).
broker.id.generation.enableAutomatic broker ID generation is no longer supported.
zookeeper.*We no longer need configurations for ZooKeeper.

New Error Codes

DUPLICATE_BROKER_REGISTRATION

There will be a new error code, DUPLICATE_BROKER_REGISTRATION, that the active controller will return when a broker tries to register with an ID that is currently in use.

INVALID_CLUSTER_ID

There will be a new error code, INVALID_CLUSTER_ID, that the controller will return if the broker tries to register with the wrong cluster ID.

AdminClient

There will be a new AdminClient RPC, unregisterBroker.

Code Block
languagejs
UnregisterBrokerResult unregisterBroker(int brokerId, UnregisterBrokerOptions options)

public class UnregisterBrokerResult {
    public KafkaFuture<Void> all();
}

public class UnregisterBrokerOptions extends AbstractOptions<UnregisterBrokerOptions> {
}

RPCs

Obsoleting the Metadata Propagation RPCs

As discussed earlier, the new controller will use FetchRequest to fetch metadata from the active controller.  The details of how Raft fetching will work are spelled out in KIP-595: A Raft Protocol for the Metadata Quorum.

Since we propagate the metadata via Raft, we will no longer need to send out LeaderAndIsrRequest, UpdateMetadataRequest, and StopReplicaRequest.  These requests will be sent out only when we're in legacy mode, not when we're in KIP-500 mode.  Eventually we will add some support for these requests to the new controller, in order to support rolling upgrade from a pre-KIP-500 release. However, for the purpose of this KIP, the new controller will not use these requests.

Obsoleting the Controlled Shutdown RPC

The broker heartbeat mechanism replaces the controlled shutdown RPC.  Therefore, we will not need to support the this RPC any more in the controller-- except for compatibility during upgrades, which will be described further in a follow-on KIP.

Topic Identifiers

This KIP builds on top of the work done in KIP-516 to improve how partitions are tracked.

Because each topic is identified by a unique topic UUID, we can implement topic deletion with a single record, RemoveTopicRecord.  Upon replaying this record, each broker will delete the associated topic if it is present.

Of course, some brokers may be down when the topic is deleted.  In fact, some brokers may never see the RemoveTopicRecord.  This record may get collapsed into one of the periodic metadata snapshots.  If this happens, the record will be reflected in the snapshot through the absence of a broker record, not its presence.  Therefore, during the startup process, brokers must compare the log directories that they have with the ones contained in the latest metadata.  The appropriate time to do this is at the start of the RECOVERY phase.  At this point, the broker has the latest metadata.

BrokerRegistration

Required ACLs: CLUSTERACTION on CLUSTER

Code Block
languagejs
{
  "apiKey": 57,
  "type": "request",
  "name": "BrokerRegistrationRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID." },
	{ "name": "ClusterId", "type": "string", "versions": "0+",
	  "about": "The cluster id of the broker process." },
	{ "name": "IncarnationId", "type": "uuid", "versions": "0+",
	  "about": "The incarnation id of the broker process." },
    { "name": "CurrentMetadataOffset", "type": "int64", "versions": "0+",
      "about": "The highest metadata offset which the broker has reached." },
    { "name": "Listeners", "type": "[]Listener",
      "about": "The listeners of this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions

New Error Codes

DUPLICATE_BROKER_REGISTRATION

There will be a new error code, DUPLICATE_BROKER_REGISTRATION, that the active controller will return when a broker tries to register with an ID that is currently in use.

INVALID_CLUSTER_ID

There will be a new error code, INVALID_CLUSTER_ID, that the controller will return if the broker tries to register with the wrong cluster ID.

AdminClient

There will be a new AdminClient RPC, unregisterBroker.

Code Block
languagejs
UnregisterBrokerResult unregisterBroker(int brokerId, UnregisterBrokerOptions options)

public class UnregisterBrokerResult {
    public KafkaFuture<Void> all();
}

public class UnregisterBrokerOptions extends AbstractOptions<UnregisterBrokerOptions> {
}

RPCs

Obsoleting the Metadata Propagation RPCs

As discussed earlier, the new controller will use FetchRequest to fetch metadata from the active controller.  The details of how Raft fetching will work are spelled out in KIP-595: A Raft Protocol for the Metadata Quorum.

Since we propagate the metadata via Raft, we will no longer need to send out LeaderAndIsrRequest, UpdateMetadataRequest, and StopReplicaRequest.  These requests will be sent out only when we're in legacy mode, not when we're in KIP-500 mode.  Eventually we will add some support for these requests to the new controller, in order to support rolling upgrade from a pre-KIP-500 release. However, for the purpose of this KIP, the new controller will not use these requests.

Obsoleting the Controlled Shutdown RPC

The broker heartbeat mechanism replaces the controlled shutdown RPC.  Therefore, we will not need to support the this RPC any more in the controller-- except for compatibility during upgrades, which will be described further in a follow-on KIP.

Topic Identifiers

This KIP builds on top of the work done in KIP-516 to improve how partitions are tracked.

Because each topic is identified by a unique topic UUID, we can implement topic deletion with a single record, RemoveTopicRecord.  Upon replaying this record, each broker will delete the associated topic if it is present.

Of course, some brokers may be down when the topic is deleted.  In fact, some brokers may never see the RemoveTopicRecord.  This record may get collapsed into one of the periodic metadata snapshots.  If this happens, the record will be reflected in the snapshot through the absence of a broker record, not its presence.  Therefore, during the startup process, brokers must compare the log directories that they have with the ones contained in the latest metadata.  The appropriate time to do this is at the start of the RECOVERY phase.  At this point, the broker has the latest metadata.

BrokerRegistration

Required ACLs: CLUSTERACTION on CLUSTER

Code Block
languagejs
{
  "apiKey": 57,
  "type": "request",
  "name": "BrokerRegistrationRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields        "about": [
 "The hostname." },
        { "name": "BrokerIdPort", "type": "int32uint16", "versions": "0+",
          "about": "The broker IDport." },
	        { "name": "ClusterIdSecurityProtocol", "type": "stringint16", "versions": "0+",
	: "0+",
          "about": "The cluster id of the broker process." },
	 security protocol." }
      ]
    { "name": "IncarnationIdFeatures", "type": "uuid[]Feature", "versions": "0+",
	
      "about": "The incarnationfeatures idon of thethis broker", process."versions": },
"0+", "fields": [
        { "name": "CurrentMetadataOffsetName", "type": "int64string", "versions": "0+", "mapKey": true,
          "about": "The highestfeature metadata offset which the broker has reached." },
name." }
        { "name": "ListenersMinSupportedVersion", "type": "[]Listenerint16", "versions": "0+",
          "about": "The listenersminimum ofsupported thisfeature brokerlevel.", "versions": "0+", "fields": [ },
        { "name": "NameMaxSupportedVersion", "type": "stringint16", "versions": "0+", "mapKey": true,
          "about": "The namemaximum ofsupported thefeature endpointlevel." },
      ]
    },
    { "name": "HostRack", "type": "string", "versions": "0+",
   "nullableVersions": "0+",
       "about": "The hostname rack which this broker is in." },
  ]
}

{
  "apiKey": 57,
   { "nametype": "Portresponse",
  "typename": "uint16BrokerRegistrationResponse",
  "versionsvalidVersions": "0+",
          "about"flexibleVersions": "The port." }0+",
  "fields": [
     { "name": "SecurityProtocolThrottleTimeMs", "type": "int16int32", "versions": "0+",
          "about": "TheDuration securityin protocol." }
      ]
    { "name": "Features", "type": "[]Feature",
      "about": "The features on this broker", "versions": "0+", "fields": [
        milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "NameErrorCode", "type": "stringint16", "versions": "0+",
  "mapKey": true,
   "about": "The error code, or 0 if "about": "The feature namethere was no error." },
        { "name": "MinSupportedVersionBrokerEpoch", "type": "int16int64", "versions": "0+",
    "default": "-1",
      "about": "The minimumbroker's supported feature level." },
        { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The maximum supported feature level." }
      ]
    },assigned epoch, or -1 if none was assigned." }
  ]
}

BrokerHeartbeat

Required ACLs: CLUSTERACTION on CLUSTER

As described earlier, the broker periodically sends out a heartbeat request to the active controller.

Code Block
languagejs
{
  "apiKey": 58,
  "type": "request",
  "name": "BrokerHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "RackBrokerId", "type": "stringint32", "versions": "0+", "nullableVersions": "0+",
      "about": "The rack which this broker is inID." },
  ]
}

{
   { "apiKeyname": 57"BrokerEpoch",
  "type": "responseint64",
  "nameversions": "BrokerRegistrationResponse0+",
  "validVersionsdefault": "0-1",
      "flexibleVersionsabout": "0+",
The broker epoch."fields": [},
    { "name": "ThrottleTimeMsCurrentMetadataOffset", "type": "int32int64", "versions": "0+",
      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota"about": "One more than the highest metadata offset which the broker has reached." },
    { "name": "ErrorCodeWantFence", "type": "int16bool", "versions": "0+",
      "about": "TheTrue errorif code,the orbroker 0wants ifto therebe wasfenced, nofalse errorotherwise." },
    { "name": "BrokerEpochWantShutDown", "type": "int64bool", "versions": "0+", "default": "-1",
      "about": "TheTrue broker'sif assignedthe epoch,broker orwants -1to ifinitiate nonecontrolled was assignedshutdown." }
  ]
}

BrokerHeartbeat

Required ACLs: CLUSTERACTION on CLUSTER

As described earlier, the broker periodically sends out a heartbeat request to the active controller.

Code Block
languagejs
   

{
  "apiKey": 58,
  "type": "requestresponse",
  "name": "BrokerHeartbeatRequestBrokerHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerIdThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The broker IDDuration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "BrokerEpochErrorCode", "type": "int64int16", "versions": "0+", "default": "-1",
      "about": "The broker epoch error code, or 0 if there was no error." },
    { "name": "CurrentMetadataOffsetIsCaughtUp", "type": "int64bool", "versions": "0+",
      "about": "OneTrue if morethe thanbroker thehas highestapproximately metadatacaught offsetup whichwith the brokerlatest has reachedmetadata." },
    { "name": "WantFenceIsFenced", "type": "bool", "versions": "0+",
      "about": "True if the broker wants tois be fenced, false otherwise." },
    { "name": "WantShutDownShouldShutDown", "type": "bool", "versions": "0+",
      "about": "True if the broker wantsshould toproceed initiatewith controlledits shutdown." }
  ]
}   

The controller will wait to unfence a broker until it sends a heartbeat where ShouldFence is false and CurrentMetadataOffset is caught up.

If the heartbeat request has ShouldShutDown set, the controller will try to move all the leaders off of the broker.

The controller will set ControlledShutdownOk if the broker is cleared to execute a controlled shutdown.  In other words, if it has no leaderships.

The controller will return NOT_CONTROLLER if it is not active.  Brokers will always return NOT_CONTROLLER for these RPCs.

UnregisterBroker

Required ACLs: ALTER on CLUSTER

The UnregisterBrokerRequest asks the controller to unregister a broker from the cluster.

Code Block
languagejs
{
  "apiKey": 58,
  "type": "response",
  "name": "BrokerHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },59,
  "type": "request",
  "name": "UnregisterBrokerRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCodeBrokerId", "type": "int16int32", "versions": "0+",
      "about": "The errorbroker code,ID orto 0 if there was no error." },
    { "nameunregister." }
  ]
}   

{
  "apiKey": 59,
  "type": "IsCaughtUpresponse",
  "typename": "boolUnregisterBrokerResponse",
  "versionsvalidVersions": "0+",
      "aboutflexibleVersions": "True if the broker has approximately caught up with the latest metadata." },0+",
  "fields": [
    { "name": "IsFencedThrottleTimeMs", "type": "boolint32", "versions": "0+",
      "about": "TrueDuration in milliseconds for which the request was throttled due to a quota violation, or zero if the broker is fenced request did not violate any quota." },
    { "name": "ShouldShutDownErrorCode", "type": "boolint16", "versions": "0+",
      "about": "True if the broker should proceed with its shutdownThe error code, or 0 if there was no error." }
  ]
}

The controller will wait to unfence a broker until it sends a heartbeat where ShouldFence is false and CurrentMetadataOffset is caught up.

If the heartbeat request has ShouldShutDown set, the controller will try to move all the leaders off of the broker.

The controller will set ControlledShutdownOk if the broker is cleared to execute a controlled shutdown.  In other words, if it has no leaderships.

The controller will return NOT_CONTROLLER if it is not active.  Brokers will always return NOT_CONTROLLER for these RPCs.

UnregisterBroker

Required ACLs: ALTER on CLUSTER

The UnregisterBrokerRequest asks the controller to unregister a broker from the cluster.

,
  ]
}

The valid response codes are:

  • NONE if the unregistration succeeded or if the broker was already unregistered.
  • NOT_CONTROLLER if the node that the request was sent to is not the controller
  • UNSUPPORTED_VERSION if KIP-500 mode is not enabled

Record Formats

RegisterBrokerRecord

Code Block
Code Block
languagejs
{
  "apiKey": 590,
  "type": "requestmetadata",
  "name": "UnregisterBrokerRequestRegisterBrokerRecord"
,
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
     [
	{ "name": "BrokerId", "type": "int32", "versions": "0+",
    	  "about": "The broker ID to unregisterid." }
  ]
}   

{
  "apiKey": 59,
  "type": "response",
  "name,
	{ "name": "IncarnationId", "type": "UnregisterBrokerResponseuuid",
  "validVersionsversions": "0+",
	  "flexibleVersionsabout": "0+",
The incarnation "fields": [
    id of the broker process." },
	{ "name": "ThrottleTimeMsBrokerEpoch", "type": "int32int64", "versions": "0+",
 	     "about": "DurationThe inbroker millisecondsepoch forassigned whichby the request was throttled due to a quota violation, or zero if the request did not violate any quotacontroller." },
    	{ "name": "ErrorCodeEndPoints", "type": "int16[]BrokerEndpoint", "versions": "0+",
    "nullableVersions": "0+",
	  "about": "The error code, or 0 if there was no error." },
  ]
}

The valid response codes are:

  • NONE if the unregistration succeeded or if the broker was already unregistered.
  • NOT_CONTROLLER if the node that the request was sent to is not the controller
  • UNSUPPORTED_VERSION if KIP-500 mode is not enabled

Record Formats

RegisterBrokerRecord

Code Block
{
  "apiKey": 0,
  "type": "metadata",
  endpoints that can be used to communicate with this broker.", "fields": [
		{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
		  "about": "The name of the endpoint." },
		{ "name": "RegisterBrokerRecordHost"
,
  "type": "string", "validVersionsversions": "0+",
		  "fieldsabout": [
"The hostname." },
		{ "name": "BrokerIdPort", "type": "int32uint16", "versions": "0+",
		  "about": "The broker idport." },
		{ "name": "IncarnationIdSecurityProtocol", "type": "uuidint16", "versions": "0+",
		  "about": "The incarnation id of the broker processsecurity protocol." }
	]},
	{ "name": "BrokerEpochFeatures", "type": "int64[]BrokerFeature", "versions": "0+", "nullableVersions": "0+",
	  "about": "The brokerfeatures epochthat assignedthis by the controller." },
broker supports.", "fields": [
		{ "name": "EndPointsName", "type": "[]BrokerEndpointstring", "versions": "0+", "nullableVersionsmapKey": "0+"true,
		  "about": "The endpointsname thatof can be used to communicate with this broker.", "fields": [the feature." },
		{ "name": "NameMinVersion", "type": "stringint16", "versions": "0+", "mapKey": true,
		  "about": "The name of the endpointminimum feature level that this broker supports." },
		{ "name": "HostMaxVersion", "type": "stringint16", "versions": "0+",
		  "about": "The hostnamemaximum feature level that this broker supports." }
	]},
		{ "name": "PortRack", "type": "uint16string", "versions": "0+", "nullableVersions": "0+",
		  "about": "The portbroker rack." },
		{ "name": "SecurityProtocol",
  ]
}

UnregisterBrokerRecord

Code Block
{
  "apiKey": 1,
  "type": "int16metadata",
  "versionsname": "0+UnregisterBrokerRecord",
		  "aboutvalidVersions": "The0",
 security protocol."fields": }[
	]},
	{ "name": "FeaturesBrokerId", "type": "[]BrokerFeatureint32", "versions": "0+", "nullableVersions": "0+",
	  "about": "The features that this broker supportsid.", "fields": [
	 },
	{ "name": "NameBrokerEpoch", "type": "stringint64", "versions": "0+", "mapKey": true,
		  "about": "The name of the feature." },
	 broker epoch." }
  ]
}

TopicRecord

Code Block
{
  "apiKey": 2,
  "type	{ "name": "MinVersionmetadata",
  "typename": "int16TopicRecord",
  "versionsvalidVersions": "0+",
		  "aboutfields": "The[
 minimum feature level that this broker supports." },
		{ "name": "MaxVersionTopicName", "type": "int16string", "versions": "0+",
		          "about": "The maximum feature level that this broker supports." }
	]},
	 topic name." },
        { "name": "RackTopicId", "type": "stringuuid", "versions": "0+", "nullableVersions": "0+",
	
          "about": "The broker rackunique ID of this topic." }
  ]
}

...

PartitionRecord

Code Block
{
  "apiKey": 13,
  "type": "metadata",
  "name": "UnregisterBrokerRecordPartitionRecord",
  "validVersions": "0",
  "fields": [
	{ "name": "BrokerId", "type": "int32", "versions": "0+",
	  "aboutfields": "The[
  broker id." },
	{ "name": "BrokerEpochPartitionId", "type": "int64int32", "versions": "0+",
	 "default": "-1",
      "about": "The brokerpartition epochid." },
    ]
}

TopicRecord

Code Block
{
  "apiKeyname": 2"TopicId",
  "type": "metadatauuid",
  "nameversions": "TopicRecord0+",
      "validVersionsabout": "0",
The unique "fields": [
    ID of this topic." },
    { "name": "TopicNameReplicas", "type":  "string[]int32", "versions":  "0+",
          "about": "The topic name replicas of this partition, sorted by preferred order." },
        { "name": "TopicIdIsr", "type":  "uuid[]int32", "versions":  "0+",
          "about": "The uniquein-sync IDreplicas of this topic.partition" },
  ]
}

PartitionRecord

Code Block
{
   { "apiKeyname": 3"RemovingReplicas",
  "type":  "metadata[]int32",
  "nameversions":  "PartitionRecord0+",
      "validVersionsabout": "0",
  "fields": [The replicas that we are in the process of removing." },
    { "name": "PartitionIdAddingReplicas", "type":  "[]int32", "versions":  "0+", "default": "-1",
      "about": "The partition id replicas that we are in the process of adding." },
    { "name": "TopicIdLeader", "type": "uuidint32", "versions": "0+", "versionsdefault": "0+-1",
      "about": "The unique ID of this topic lead replica, or -1 if there is no leader." },
    { "name": "ReplicasLeaderEpoch", "type":  "[]int32", "versions":  "0+", "default": "-1",
      "about": "The replicas of this partition, sorted by preferred orderAn epoch that gets incremented each time we change the leader." },

  ]
}

ConfigRecord

Code Block
{    {     
  "nameapiKey": "Isr"4,
  "type":  "[]int32metadata",
  "versionsname":  "0+ConfigRecord",
      "aboutvalidVersions": "The in-sync replicas of this partition" },0",
  "fields": [
    { "name": "RemovingReplicasResourceType", "type":  "[]int32int8", "versions":  "0+",
      "about": "The replicastype thatof weresource arethis inconfiguration the process of removingapplies to." },
    { "name": "AddingReplicasResourceName", "type":  "[]int32string", "versions":  "0+",
      "about": "The replicasname thatof wethe areresource inthis theconfiguration processapplies of addingto." },         
    { "name": "LeaderName", "type": "int32string", "versions": "0+",
      "defaultabout": "-1",
The name of the configuration key." },       "about": "The lead replica, or -1 if there is no leader." },
    { "name": "LeaderEpochValue", "type": "int32string", "versions": "0+", "default": "-1",    
      "about": "An epoch that gets incremented each time we change": "The value of the leaderconfiguration." }
  ]
}

ConfigRecord

Code Block
{           
} 

PartitionChangeRecord

Code Block
{
  "apiKey": 45,
  "type": "metadata",
  "name": "ConfigRecordPartitionChangeRecord",
  "validVersions": "0",
  "fields": [
    { "name": "ResourceTypePartitionId", "type": "int8int32", "versions": "0+", "default": "-1",
      "about": "The type of resource this configuration applies topartition id." },
    { "name": "ResourceNameTopicId", "type": "stringuuid", "versions": "0+",
      "about": "The nameunique ID of the resource this configuration applies totopic." },         
    { "name": "NameIsr", "type":  "[]int32", "default": "stringnull", "versionsentityType": "0+brokerId",
      "aboutversions": "0+"The name of the configuration key." },, "nullableVersions": "0+", "taggedVersions": "0+", "tag": 0,
      "about": "null if the ISR didn't change; the new in-sync replicas otherwise." },
    { "name": "ValueLeader", "type": "stringint32", "versionsdefault": "0+-2",     "entityType": "brokerId",
      "aboutversions": "0+"The value of the configuration." }, "taggedVersions": "0+", "tag": 1,
  ]    "about": "-1 if there is now no 
} 

IsrChangeRecord

Code Block
{
  "apiKey": 5,
  "type": "metadata",
  "name": "IsrChangeRecord",
  "validVersions": "0",
  "fields": [leader; -2 if the leader didn't change; the new leader otherwise." },
    { "name": "PartitionIdReplicas", "type": "[]int32", "versionsdefault": "0+null", "defaultentityType": "-1brokerId",
      "aboutversions": "The partition id." },
    { "name0+", "nullableVersions": "TopicId0+", "typetaggedVersions": "uuid0+", "versionstag": "0+"2,
      "about": "The unique ID of this topicnull if the replicas didn't change; the new replicas otherwise." },
    { "name": "IsrRemovingReplicas", "type":  "[]int32", "versionsdefault":  "0+null",
      "aboutentityType": "The in-sync replicas of this partition" } "brokerId",
    {  "nameversions": "Leader0+", "typenullableVersions": "int320+", "versionstaggedVersions": "0+", "defaulttag": "-1"3,
      "about": "The lead replica, or -1 if there is no leadernull if the removing replicas didn't change; the new removing replicas otherwise." },
    { "name": "LeaderEpochAddingReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "0+", "nullableVersions": "0+", "defaulttaggedVersions": "0+", "-1tag": 4,
      "about": "Annull if epochthe thatadding getsreplicas incremented each time we change the leaderdidn't change; the new adding replicas otherwise." }
  ]
} 

AccessControlRecord

...