Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, any broker node can be elected as the controller.  As part of this KIP, the active controller will instead be selected among a small potentially smaller pool of nodes specifically configured to act as controllers.  Typically three or five nodes in the cluster will be selected to be controllers.

...

Note that as long as at least one of the provided controller addresses is valid, the broker will be able to learn about the current metadata quorum and start up.  Once the broker is in contact with the metadata quorum, the quorum bootstrap addresses will not be needed.  This makes it possible to reconfigure the metadata quorum over time.  For example, if we start with a metadata quorum of host1, host2, host3, we could replace host3 with host4 without disrupting any of the brokers.  Then we could roll the brokers to apply the new metadata quorum bootstrap configuration of host1, host2, host4 on each one.

Node IDs

Just like brokers, controller nodes will have non-negative integer node IDs.  There will be a single ID space.  In other words, no controller should share the same ID as a broker.  Even when a broker and a controller are co-located in the same JVM, they must have different node IDs.

Automatic node ID assignment via ZooKeeper will no longer be supported in KIP-500 mode.  Node IDs must be set in the configuration file for brokers and controllers.

We define a node here as a tuple consisting of a node ID and a process role. Roles are defined by the `process.roles` configuration. As explained above, in a co-located configuration, a single process may take both the "controller" and "broker" roles. The node ID for both of these roles will be defined by the `node.id` configuration. However, this is mainly for configuration convenience. Semantically, we view the co-located process as representing two distinct nodes. Each node has its own listeners and its own set of APIs which it exposes. The APIs exposed by a controller node will not be the same as those exposed by a broker node.

Automatic node ID assignment via ZooKeeper will no longer be supported in KIP-500 mode.  Node IDs must be set in the configuration file.

Networking

Controller processes will listen on a separate endpoint from brokers.  This will be true even when the broker and controller are co-located in the same JVM.

...

As described in KIP-500, the controller will store its data in the internal @metadata topic__cluster_metadata topic.  This topic will contain a single partition which is managed by Raft, as described in KIP-595: A Raft Protocol for the Metadata Quorum.

...

Metadata changes need to be persisted to the @metadata __cluster_metadata log before we apply them on the other nodes in the cluster.  This means waiting for the metadata log's last stable offset to advance to the offset of the change.  After that point, we are guaranteed not to lose the change as long as we uphold the Raft invariants.

...

However, this "future state" may never be committed.  For example, the active controller might fail, truncating some of its future state.  Therefore, the active controller must not make this future state "visible" to the rest of the cluster until it has been made persistent – that is, until it becomes current state.  In the case of the @metadata topic__cluster_metadata topic, the replication protocol itself neatly takes care of this for us.  In the case of controller RPCs like AlterIsr, the controller handles this by not sending back a response until the designated change has been persisted.

...

The active controller makes changes to the metadata by appending records to the log. Each record has a null key, and this format for its value:

  1. an unsigned varint specifying the frame type (currently 0)
  2. an unsigned varint specifying the record type.
  3. an unsigned varint specifying the record version
  4. the payload in Kafka RPC format

For example, if we wanted to encode a TopicRecord, we might have 0 encoded as a varint, 1 encoded as a varint, followed by 0 as the record version, followed by the serialized topic data.

The frame type, record type, and version will typically only take one byte each, for a total header size overhead of two three bytes.

Record Format Versions

...

In the post-KIP-500 world, controller shutdown is handled by the broker heartbeat system instead.  In its periodic heartbeats, the broker asks the controller if it can transition into the controlled shutdown state.  It does this by setting the ShouldShutdown WantShutDown boolean.  This motivates the controller to move all of the leaders off of that broker.  Once they are all moved, the controller responds to the heartbeat with ControlledShutdownOk ShouldShutDown = true.  At that point, the broker knows it's safe to begin the shutdown process proper.

...

The main change in the broker state machine is that the RECOVERING_FROM_UNCLEAN_SHUTDOWN state has been renamed to RECOVERY.  Also, unlike previously, the broker will always pass through RECOVERY (although it may only stay in this state for a very short amount of time).

Public Interfaces

meta.properties

When a storage directory is in use by a cluster running in kip-500 mode, it will have a new version of the meta.properties file.  Since the current version is 0, the new version will be 1.   Just as in version 0, meta.properties will continue to be a Java Properties file.  This essentially means that it is a plain text file where each line has the format key=value.

In version 0 of meta.properties, the cluster.id field is optional.  In contrast, in version 1 it is mandatory.

In version 0 of meta.properties, the cluster.id field is serialized in hexadecimal.  In contrast, in version 1 it is serialized in base64.

Version 1 of meta.properties adds the controller.id field.  Nodes which are acting as controllers will have this field.  If the node is also a broker, it will have the broker.id field as well.

During broker startup, if each directory does not have a broker.id, one will be added with the current broker id.  Similarly, during controller startup, if each directory does not have a controller id, one will be added with the current controller id.

If the broker id and/or controller id is present but does not match the configured id, we will throw an exception and abort the startup process.

Here is an example of a version 1 meta.properties file:

Code Block
$ cat /tmp/kafka-logs/meta.properties
#
#Tue Dec 01 10:08:08 PST 2020
cluster.id=3Db5QLSqSZieL3rJBUUegA
version=1
broker.id=0

kafka-storage.sh

There will be a new command-line tool, kafka-storage.sh.

Code Block
$ ./bin/kafka-storage.sh -h
usage: kafka-storage [-h] {info,format,random-uuid} ...

The Kafka storage tool.

positional arguments:
  {info,format,random-uuid}
    info                 Get information about the Kafka log directories on this node.
    format               Format the Kafka log directories on this node.
    random-uuid          Print a random UUID.

optional arguments:
  -h, --help             show this help message and exit

kafka-storage.sh will have three subcommands: info, format, and random-uuid.

info

Code Block
$ ./bin/kafka-storage.sh info -h
usage: kafka-storage info [-h] --config CONFIG

optional arguments:
  -h, --help             show this help message and exit
  --config CONFIG, -c CONFIG
                         The Kafka configuration file to use.

The info command will give information about the configured storage directories.  Example output:

Code Block
Found log directory:
  /tmp/kafka-logs

Found metadata: MetaProperties(clusterId=51380268-1036-410d-a8fc-fb3b55f48033)

...

Partition Reassignment

Just like the ZooKeeper-based controller, the quorum controller will implement the KIP-455 partition reassignment API. This API specifies both alter and list operations.

The alter operation specifies a partition, plus a target replica list. If the target replica list is a subset of the partition's current replica list, and the new replica list contains at least one member of the current ISR, then the reassignment can be completed immediately. In this case, the controller issues a PartitionChangeRecord changing the replica set and ISR (if appropriate).

On the other hand, if the new replica list has additional replicas, or specifies a subset of replicas that doesn't intersect with the current ISR, then we add the additional replicas to the replica list, and wait for the ISR to catch up.

So, for example, if the old replica list was [1, 2, 3], and the new list was [3, 4, 5], we would proceed as follows:

  • change the replica list to [1, 2, 3, 4, 5], and set addingReplicas to [4, 5]
  • Wait for an alterIsr which adds both broker 4 and 5
  • Change the replica list to [3, 4, 5 ] and prune the ISR accordingly. Remove addingReplicas.

Just like with the ZooKeeper-based controller, the new controller stores reassignments as "hard state." Reassignments will continue even after a controller failover or broker shutdown and restart. Reassignment state appears in controller snapshots, as addingReplicas and removingReplicas.

Unlike the old controller, the quorum controller does not prevent topics that are undergoing reassignment from being deleted. If a topic is deleted, then any partition reassignments that it had are terminated.

Typically, reassignments which require ISR changes are completed by a leader adding some new replicas, via the alterIsr RPC. (Note that another possible way for a reassignment to be completed is via a broker being added to a partition ISR during unfencing). 

When a broker makes an alterIsr change that completes a reassignment, the resulting ISR will be different than requested. For example, if the leader is 1, and the current ISR is 1, 2, 3, and the target replica set is 1, 2, 4, when the leader tries to change the ISR to 1, 2, 3, 4, the controller will change it to 1, 2, 4 instead, completing the reassignment. Since the alterIsr response returns the actual ISR which was actually applied, the leader will apply this new ISR instead.

If the new ISR would does not contain the leader which made the alterIsr call, the controller returns FENCED_LEADER_EPOCH. This will notify the broker that it should wait until it can replay the latest partition change from the log. That partition change record will make this broker a follower rather than a leader.

Public Interfaces

meta.properties

When a storage directory is in use by a cluster running in kip-500 mode, it will have a new version of the meta.properties file.  Since the current version is 0, the new version will be 1.   Just as in version 0, meta.properties will continue to be a Java Properties file.  This essentially means that it is a plain text file where each line has the format key=value.

In version 0 of meta.properties, the cluster.id field is optional.  In contrast, in version 1 it is mandatory.

In version 0 of meta.properties, the cluster.id field is serialized in hexadecimal.  In contrast, in version 1 it is serialized in base64.

Version 1 of meta.properties replaces the broker.id field with node.id.

For servers running in kip-500 mode, the `meta.properties` file must be present in every log directory. The process will raise an error during startup if if either the meta.properties file does not exist or if the node.id found does not match what the value from the configuration file.

Here is an example of a version 1 meta.properties file:

Code Block
$ cat /tmp/kafka-logs/meta.properties
#
#Tue Dec 01 10:08:08 PST 2020
cluster.id=3Db5QLSqSZieL3rJBUUegA
version=1
node.id=0

kafka-storage.sh

There will be a new command-line tool, kafka-storage.sh.

Code Block
$ ./bin/kafka-storage.sh -h
usage: kafka-storage [-h] {info,format,random-uuid} ...

The Kafka storage tool.

positional arguments:
  {info,format,random-uuid}
    info                 Get information about the Kafka log directories on this node.
    format               Format the Kafka log directories on this node.
    random-uuid          Print a random UUID.

optional arguments:
  -h, --help             show this help message and exit

kafka-storage.sh will have three subcommands: info, format, and random-uuid.

info

Code Block
$ ./bin/kafka-storage.sh info -h
usage: kafka-storage info [-h] --config CONFIG

optional arguments:
  -h, --help             show this help message and exit
  --config CONFIG, -c CONFIG
                         The Kafka configuration file to use.

The info command will give information about the configured storage directories.  Example output:

Code Block
Found log directory:
  /tmp/kafka-logs

Found metadata: MetaProperties(clusterId=51380268-1036-410d-a8fc-fb3b55f48033)

format

Code Block
$ ./bin/kafka-storage.sh format -h
usage: kafka-storage format [-h] --config CONFIG --cluster-id CLUSTER_ID [--ignore-formatted]

optional arguments:
  -h, --help             show this help message and exit
  --config CONFIG, -c CONFIG
                         The Kafka configuration file to use.
  --cluster-id CLUSTER_ID, -t CLUSTER_ID
                         The cluster ID to use.
  --ignore-formatted, -g

When running kip-500 mode, the storage directories must be formatted using this command prior to starting up the brokers and controllers.

If any of the storage directories are formatted, the command will normally fail.  This behavior can be changed by passing the --ignore-formatted option.  When this option is passed, the format command will skip over already formatted directories rather than failing.

random-uuid

Code Block
$ ./bin/kafka-storage.sh random-uuid -h
usage: kafka-storage random-uuid [-h]

optional arguments:
  -h, --help             show this help message and exit

The random-uuid command prints out a random UUID to stdout.

Code Block
$ ./bin/kafka-storage.sh random-uuid
8XUwXa9qSyi9tSOquGtauQ

kafka-cluster.sh

There will also be new command-line tool named kafka-cluster.sh.

Code Block
$ ./bin/kafka-cluster.sh -h
usage: kafka-cluster [-h] {id,unregister} ...

The Kafka cluster tool.

positional arguments:
  {cluster-id,unregister}
    cluster-id           Get information about the ID of a cluster.
    unregister           Unregister a broker ID.

optional arguments:
  -h, --help             show this help message and exit

kafka-storage.sh will have two subcommands: id and unregister.

cluster-id

Code Block
$ ./bin/kafka-cluster.sh cluster-id -h
usage: kafka-cluster cluster-id [-b,-c,-h]

optional arguments:
  -b, --bootstrap-server a list of host/port pairs to use for establishing the connection to the kafka cluster.
  -c, --config           a property file containing configs to be passed to Admin Client.
  -h, --help             show this help message and exit

The ID command prints out the cluster id

unregister

Code Block
$ ./bin/kafka-cluster.sh unregister -h
usage: kafka-cluster unregister [-h] [--bootstrap-server BOOTSTRAP_SERVER] [--config CONFIG] [--id ID]

optional arguments:
  -h, --help             show this help message and exit
  --bootstrap-server BOOTSTRAP_SERVER, -b BOOTSTRAP_SERVER
                         A list of host/port pairs to use for establishing the connection to the kafka cluster.
  --config CONFIG, -c CONFIG
                         A property file containing configs to passed to AdminClient.
  --id ID, -i ID         The ID of the broker to unregister.

The decommission command removes the registration of a specific broker ID.  It will use make an UnregisterBrokerRequest in order to do this.

Changes to kafka-dump-log-segments.sh

kafka-dump-log-seguments.sh will have two new flags: --cluster-metadata-decoder, and --skip-record-metadata.

The --cluster-metadata-decoder flag will tell the DumpLogSegments tool to decode the records as KIP-500 metadata.  Each record will be output in the following JSON format:

Code Block
payload: {"type":<record type enum name>, "version":<record version number>, "data":<record JSON>}

Example output:

Code Block
 payload: {"type":"TOPIC_RECORD","version":0,"data":{"name":"bar","topicId":"GU_rXds2FGppL1JqXYpx2g"}}
 payload: {"type":"PARTITION_RECORD","version":0,"data":{"partitionId":0,"topicId":"GU_rXds2FGppL1JqXYpx2g","replicas":[1],"isr":[1],"removingReplicas":null,"addingReplicas":null,"leader":1,"leaderEpoch":0,"partitionEpoch":0}}
 payload: {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":0,"topicId":"GU_rXds2FGppL1JqXYpx2g","leader":-1}}
 payload: {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":0,"topicId":"WCnrza5uWKeerYa7HCNpOg","leader":-1}}
 payload: {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":0,"topicId":"GU_rXds2FGppL1JqXYpx2g","leader":-1}}
 payload: {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":0,"topicId":"WCnrza5uWKeerYa7HCNpOg","leader":-1}}
 payload: {"type":"FENCE_BROKER_RECORD","version":0,"data":{"id":1,"epoch":0}}

The --skip-record-metadata flag will skip printing metadata for each record.  However, metadata for each record batch will still be printed when this flag is present.

kafka-metadata-shell.sh

The Kafka Metadata shell is a new command which allows users to interactively examine the metadata stored in a KIP-500 cluster.

It can read the metadata from the controllers directly, by connecting to them, or from a metadata snapshot on disk.  In the former case, the quorum voters must be specified by passing the --controllers flag; in the latter case, the snapshot file should be specified via --snapshot.

Code Block
$ ./bin/kafka-storagemetadata-shell.sh format -h
usage: kafka-storage format [-h] --config CONFIG --cluster-id CLUSTER_ID [--ignore-formatted]

optional arguments:
  -h, --help             show this help message and exit
  metadata-tool [-h] [--controllers CONTROLLERS] [--config CONFIG,] [-c CONFIG
                         The Kafka configuration file to use.
  --cluster-id CLUSTER_ID, -t CLUSTER_ID
  -snapshot SNAPSHOT] [command [command ...]]

The Apache Kafka metadata tool

positional arguments:
  command                       The cluster ID to use.
  --ignore-formatted, -g

When running kip-500 mode, the storage directories must be formatted using this command prior to starting up the brokers and controllers.

If any of the storage directories are formatted, the command will normally fail.  This behavior can be changed by passing the --ignore-formatted option.  When this option is passed, the format command will skip over already formatted directories rather than failing.

random-uuid

Code Block
$ ./bin/kafka-storage.sh random-uuid -h
usage: kafka-storage random-uuid [-h]

optional arguments:
  -h, --helpcommand to run.

optional arguments:
  -h, --help             show this help message and exit
  --controllers CONTROLLERS, -C CONTROLLERS
                    show this help message and exit

The random-uuid command prints out a random UUID to stdout.

Code Block
$ ./bin/kafka-storage.sh random-uuid
8XUwXa9qSyi9tSOquGtauQ

kafka-cluster.sh

There will also be new command-line tool named kafka-cluster.sh.

Code Block
$ ./bin/kafka-cluster.sh -h
usage: kafka-cluster [-h] {id,decommission} ...

The Kafka cluster tool.

positional arguments:
  {cluster-id,decommission}
    cluster-idThe quorum voter connection string to use.
  --config CONFIG, -c CONFIG
                         The configuration file to use.
  --snapshot SNAPSHOT, -s SNAPSHOT
               Get information about the ID of a cluster.
   The decommissionsnapshot file to read.

The metadata tool works by replaying the log and storing the state into in-memory nodes.  These nodes are presented in a fashion similar to filesystem directories.  For browsing the nodes, several commands are supported:

Code Block
>> help
Welcome to the Apache  Decommission a broker ID.

optionalKafka metadata shell.

usage:  {cat,cd,exit,find,help,history,ls,man,pwd} ...

positional arguments:
  -h, --help{cat,cd,exit,find,help,history,ls,man,pwd}
    cat                  showShow thisthe helpcontents messageof and exit

kafka-storage.sh will have two subcommands: id and decommission.

cluster-id

Code Block
$ ./bin/kafka-cluster.sh cluster-id -h
usage: kafka-cluster cluster-id [-b,-c,-h]

optional arguments:
  -b, --bootstrap-server a list of host/port pairs to use for establishing the connection to the kafka cluster.
  -c, --config metadata nodes.
    cd                   Set the current working directory.
    exit          a property file containing configs to be passedExit tothe Adminmetadata Clientshell.
  -h, --help find            show this help message and exit

The ID command prints out the cluster id

decommission

Code Block
$ ./bin/kafka-cluster.sh decommission -h
usage: kafka-cluster decommission [-h] [--bootstrap-server BOOTSTRAP_SERVER] [--config CONFIG] [--id ID]

optional arguments:
  -h, --help Search for nodes in the directory hierarchy.
    help                   showDisplay this help message and exit message.
  --bootstrap-server BOOTSTRAP_SERVER, -b BOOTSTRAP_SERVER
  history              Print command history.
    ls       A list of host/port pairs to use for establishing the connection to theList kafkametadata clusternodes.
  --config CONFIG, -cman CONFIG
                 Show the help text for a specific command.
 A property file containingpwd configs to passed to AdminClient.
  --id ID, -i ID         The ID ofPrint the brokercurrent toworking decommissiondirectory.

The decommission command removes the registration of a specific broker ID.  It will use make a DecommissionBrokerRequest in order to do thisinterface of the metadata tool is currently considered unstable and may change when KIP-500 becomes production-ready.

Configurations

Configuration NamePossible ValuesNotes
process.roles

null

broker

controller

broker,controller

If this is null (absent) then we are in legacy mode.

Otherwise, we are in KIP-500 mode and this configuration determines what roles this process should play: broker, controller, or both.

controller.listener.names

If non-null, this must be a comma-separated list of listener names.

When communicating with the controller quorum, the broker will always use the first listener in this list.

A comma-separated list of the names of the listeners used by the KIP-500 controller. This configuration is required if this process is a KIP-500 controller. The legacy controller will not use this configuration

Despite the similar name, note that this is different from the "control plane listener" introduced by KIP-291.

listeners

A comma-separated list of the configured listeners.  For example,

INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094

This configuration is now required.
sasl.mechanism.controller.protocolSASL mechanism used for communication with controllers. Default is GSSAPI.This is analogous to sasl.mechanism.inter.broker.protocol, but for communication with the controllers.
controller.quorum.voters

If non-null, this must be a comma-separated list of all the controller voters, in the format:

{controller-id}@{controller-host):{controller-port}

When in KIP-500 mode, each node must have this configuration, in order to find out how to communicate with the controller quorum.

Note that this replaces the "quorum.voters" config described in KIP-595.

This configuration is required for both brokers and controllers.

controllernode.ida 32-bit ID

The controller id for this server.  Only required if this server is a controller.

This configuration replaces `broker.id` for zk-based Kafka processes in order to reflect its more general usage. It serves as the ID associated with each role that the process is acting as.

For example, a configuration with `node.id=0` and `process.roles=broker,controller` defines two nodes: `broker-0` and `controller-0`

broker.ida 32-bit IDThe broker id for this server.  Only required if this server is a broker

.

initial.broker.registration.timeout.ms60000When initially registering with the controller quorum, the number of milliseconds to wait before declaring failure and exiting the broker process.
broker.heartbeat.interval.ms3000The length of time between broker heartbeats.
broker.session.timeout.ms18000The length of time that a broker lease lasts if no heartbeats are made.
metadata.log.dirIf set, this must be a path to a log directory.This configuration determines where we put the metadata log.  if it is not set, the metadata log is placed in the first log directory from log.dirs.
controller.quorum.fetch.timeout.msMaximum time without a successful fetch from the current leader before a new election is started.New name for quorum.fetch.timeout.ms

controller.quorum.election.timeout.ms

Maximum time without collected a majority of votes during the candidate state before a new election is retriedNew name for quorum.election.timeout.ms
controller.quorum.election.backoff.max.msMaximum exponential backoff time (based on the number if retries) after an election timeout, before a new election is triggered.New name for quorum.election.backoff.max.ms
controller.quorum.request.timeout.msMaximum time before a pending request is considered failed and the connection is droppedNew name for quorum.request.timeout.ms
controller.quorum.retry.backoff.msInitial delay between request retries. This config and the one below is used for retriable request errors or lost connectivity and are different from the election.backoff configs aboveNew name for quorum.retry.backoff.ms
controller.quorum.retry.backoff.max.msMax delay between requests. Backoff will increase exponentially beginning from quorum.retry.backoff.msNew name for quorum.retry.backoff.max.ms

...

There will be a new AdminClient RPC, decommissionBrokerunregisterBroker.

Code Block
languagejs
DecommissionBrokerResultUnregisterBrokerResult decommissionBrokerunregisterBroker(int brokerId, DecommissionBrokerOptionsUnregisterBrokerOptions options)

public class DecommissionBrokerResultUnregisterBrokerResult {
    public KafkaFuture<Void> all();
}

public class DecommissionBrokerOptionsUnregisterBrokerOptions extends AbstractOptions<DecommissionBrokerOptions>AbstractOptions<UnregisterBrokerOptions> {
}

RPCs

Obsoleting the Metadata Propagation RPCs

...

Code Block
languagejs
{
  "apiKey": 57,
  "type": "request",
  "name": "BrokerRegistrationRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID." },
	{ "name": "ClusterId", "type": "uuidstring", "versions": "0+",
	  "about": "The cluster id of the broker process." },
	{ "name": "IncarnationId", "type": "uuid", "versions": "0+",
	  "about": "The incarnation id of the broker process." },
    { "name": "CurrentMetadataOffset", "type": "int64", "versions": "0+",
      "about": "The highest metadata offset which the broker has reached." },
    { "name": "Listeners", "type": "[]Listener",
      "about": "The listeners of this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname." },
        { "name": "Port", "type": "uint16", "versions": "0+",
          "about": "The port." },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol." }
      ]
    { "name": "Features", "type": "[]Feature",
      "about": "The features on this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The feature name." }
        { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The minimum supported feature level." },
        { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The maximum supported feature level." }
      ]
    },
    { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The rack which this broker is in." }
  ]
}

{
  "apiKey": 57,
  "type": "response",
  "name": "BrokerRegistrationResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker's assigned epoch, or -1 if none was assigned." }
  ]
}

...

Code Block
languagejs
{
  "apiKey": 58,
  "type": "request",
  "name": "BrokerHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker epoch." },
    { "name": "CurrentMetadataOffset", "type": "int64", "versions": "0+",
      "about": "One more than the highest metadata offset which the broker has reached." },
    { "name": "ShouldFenceWantFence", "type": "bool", "versions": "0+",
      "about": "True if the broker wants to be fenced, false otherwise." }
    { "name": "ShouldShutDownWantShutDown", "type": "bool", "versions": "0+",
      "about": "True if the broker wants to initiate controlled shutdown." }
  ]
}   

{
  "apiKey": 58,
  "type": "response",
  "name": "BrokerHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "IsCaughtUp", "type": "bool", "versions": "0+",
      "about": "True if the broker has approximately caught up with the latest metadata." },
    { "name": "IsFenced", "type": "bool", "versions": "0+",
      "about": "True if the broker is fenced." },
    { "name": "ControlledShutdownOkShouldShutDown", "type": "bool", "versions": "0+",
      "about": "True if the broker canshould executeproceed awith controlledits shutdown now." }
  ]
}

The controller will wait to unfence a broker until it sends a heartbeat where ShouldFence is false and CurrentMetadataOffset is caught up.

...

The controller will return NOT_CONTROLLER if it is not active.  Brokers will always return NOT_CONTROLLER for these RPCs.

...

UnregisterBroker

Required ACLs: ALTER on CLUSTER

The DecomissionBrokerRequest UnregisterBrokerRequest asks the controller to unregister a broker from the cluster.

Code Block
languagejs
{
  "apiKey": 59,
  "type": "request",
  "name": "DecommissionBrokerRequestUnregisterBrokerRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID to decommissionunregister." }
  ]
}   

{
  "apiKey": 59,
  "type": "response",
  "name": "DecommissionBrokerResponseUnregisterBrokerResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
  ]
}

The valid response codes are:

  • NONE if the decommissioning unregistration succeeded or if the broker was already decommissionedunregistered.
  • NOT_CONTROLLER if the node that the request was sent to is not the controller
  • UNSUPPORTED_VERSION if KIP-500 mode is not enabled

...

Code Block
{         
  "apiKey": 4,
  "type": "metadata",
  "name": "ConfigRecord",
  "validVersions": "0",
  "fields": [
    { "name": "ResourceType", "type": "int8", "versions": "0+",
      "about": "The type of resource this configuration applies to." },
    { "name": "ResourceName", "type": "string", "versions": "0+",
      "about": "The name of the resource this configuration applies to." },         
    { "name": "Name", "type": "string", "versions": "0+",
      "about": "The name of the configuration key." },                  
    { "name": "Value", "type": "string", "versions": "0+",     
      "about": "The value of the configuration." }
  ]           
} 

...

PartitionChangeRecord

Code Block
{
  "apiKey": 5,
  "type": "metadata",
  "name": "IsrChangeRecordPartitionChangeRecord",
  "validVersions": "0",
  "fields": [
    { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The partition id." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique ID of this topic." },
    { "name": "Isr", "type":  "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 0,
      "about": "Thenull if the ISR didn't change; the new in-sync replicas of this partitionotherwise." },
    { "name": "Leader", "type": "int32", "default": "-2", "entityType": "brokerId",
      "versions": "0+", "versionstaggedVersions": "0+", "defaulttag": "-1",
      "about": "The lead replica, or -1 if there is no leader-1 if there is now no leader; -2 if the leader didn't change; the new leader otherwise." },
    { "name": "LeaderEpochReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "0+", "defaultnullableVersions": "0+", "taggedVersions": "-1"0+", "tag": 2,
      "about": "Annull epochif thatthe getsreplicas incremented each time we change the leaderdidn't change; the new replicas otherwise." },
   ]
} 

AccessControlRecord

Code Block
{
  "apiKeyname": 6"RemovingReplicas",
  "type": "metadata[]int32",
  "namedefault": "AccessControlRecordnull",
  "validVersionsentityType": "0brokerId",
  "fields": [
    { "nameversions": "ResourceType0+", "typenullableVersions": "int80+", "versionstaggedVersions": "0+", "tag": 3,
      "about": "The resource typenull if the removing replicas didn't change; the new removing replicas otherwise." },
    { "name": "ResourceNameAddingReplicas", "type": "[]int32", "default": "null", "string"entityType": "brokerId",
      "versions": "0+", "nullableVersions": "0+", "versionstaggedVersions": "0+", "nullableVersionstag": "0+"4,
      "about": "Thenull resourceif name,the oradding nullreplicas if this is for the default resourcedidn't change; the new adding replicas otherwise." },
  ]
} 

AccessControlRecord

Code Block
{
 { "nameapiKey": "PatternType"6,
  "type": "int8metadata",
  "versionsname": "0+AccessControlRecord",
      "aboutvalidVersions": "The pattern type (literal, prefixed, etc.)" },0",
  "fields": [
    { "name": "PrincipalResourceType", "type": "stringint8", "versions": "0+",
      "about": "The principalresource name.type" },
    { "name": "HostResourceName", "type": "string", "versions": "0+",
      "about": "The host", "nullableVersions": "0+",
      "about": "The resource name, or null if this is for the default resource." },
    { "name": "OperationPatternType", "type": "int8", "versions": "0+",
      "about": "The operationpattern type (literal, prefixed, etc.)" },
    { "name": "PermissionTypePrincipal", "type": "int8string", "versions": "0+",
      "about": "The permission type (allow, deny)principal name." },
   ]
} 

FenceBrokerRecord

Code Block
{
  "apiKeyname": 7"Host",
  "type": "metadatastring",
  "nameversions": "FenceBrokerRecord0+",
      "validVersionsabout": "0",
  "fields": [The host." },
    { "name": "BrokerIdOperation", "type": "int32int8", "versions": "0+",
      "about": "The broker ID to fence. It will be removed from all ISRs.operation type." },
    { "name": "BrokerEpochPermissionType", "type": "int64int8", "versions": "0+",
      "about": "The epoch of the broker to fence "The permission type (allow, deny)." }
  ]
} 

...

FenceBrokerRecord

Code Block
{
  "apiKey": 87,
  "type": "metadata",
  "name": "UnfenceBrokerRecordFenceBrokerRecord",
  "validVersions": "0",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID to unfence fence. It will be removed from all ISRs." }
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+",
      "about": "The epoch of the broker to unfencefence." }
  ]
} 

...

UnfenceBrokerRecord

Code Block
{
  "apiKey": 98,
  "type": "metadata",
  "name": "RemoveTopicRecordUnfenceBrokerRecord",
  "validVersions": "0",
  "fields": [
    { "name": "TopicIdBrokerId", "type": "uuidint32", "versions": "0+",
      "about": "The topic to remove. All associated partitions will be removed as well." }
  ]
} 

DelegationTokenRecord

Code Block
{
  "apiKey": 10,
  "type": "metadata",
  "name": "DelegationTokenRecord",
  "validVersions": "0",
  "fields": [broker ID to unfence." }
    { "name": "OwnerBrokerEpoch", "type": "stringint64", "versions": "0+",
      "about": "The delegation token owner epoch of the broker to unfence." },
  ]
} 

RemoveTopic

Code Block
{
 { "nameapiKey": "Renewers"9,
  "type": "[]stringmetadata",
  "versionsname": "0+RemoveTopicRecord",
      "aboutvalidVersions": "0"The,
 principals which have renewed this token." }, "fields": [
    { "name": "IssueTimestampTopicId", "type": "int64uuid", "versions": "0+",
      "about": "The topic to remove. All associated partitions will be removed as well." }
  ]
} 

DelegationTokenRecord

Code Block
{
  "apiKey": 10,
  "type": "metadata",
  "name": "DelegationTokenRecord",
  "validVersionstime at which this timestamp was issued." },
    { "name": "MaxTimestamp", "type": "int64", "versions": "0+",
      "aboutfields": "The time at which this token cannot be renewed any more." },[
    { "name": "ExpirationTimestampOwner", "type": "int64string", "versions": "0+",
      "about": "The next time at which this token must be reneweddelegation token owner." },
    { "name": "TokenIdRenewers", "type": "[]string", "versions": "0+",
      "about": "The token idprincipals which have renewed this token." },
   ]
} 

UserScramCredentialRecord

Code Block
{
  "apiKeyname": 11"IssueTimestamp",
  "type": "metadataint64",
  "nameversions": "UserScramCredentialRecord0+",
      "validVersionsabout": "0",
  "fields": [The time at which this timestamp was issued." },
    { "name": "UserNameMaxTimestamp", "type": "stringint64", "versions": "0+",
      "about": "The user name time at which this token cannot be renewed any more." },
    { "name": "CredentialInfosExpirationTimestamp", "type": "[]CredentialInfoint64", "versions": "0+",
      "about": "The mechanismnext andtime relatedat informationwhich associatedthis withtoken themust user's SCRAM credentialbe renewed.", "fields": [ },
      { "name": "MechanismTokenId", "type": "int8string", "versions": "0+",
        "about": "The SCRAMtoken mechanismid." },
  ]
} 

UserScramCredentialRecord

Code Block
{
   { "nameapiKey": "Salt"11,
  "type": "bytesmetadata",
  "versionsname": "0+UserScramCredentialRecord",
        "about": "A random salt generated by the client." }"validVersions": "0",
  "fields": [
    { "name": "SaltedPasswordUserName", "type": "bytesstring", "versions": "0+",
        "about": "The salteduser passwordname." },
      { "name": "IterationsCredentialInfos", "type": "int32[]CredentialInfo", "versions": "0+",
        "about": "The mechanism numberand ofrelated iterationsinformation usedassociated inwith the user's SCRAM credential.", "fields": }]}[
   ]
} 

FeatureLevelRecord

Code Block
{
  "apiKey   { "name": 12"Mechanism",
  "type": "metadataint8",
  "nameversions": "FeatureLevelRecord0+",
  "validVersions      "about": "0"The SCRAM mechanism." },
  "fields": [
    { "name": "NameSalt", "type": "stringbytes", "versions": "0+",
  "mapKey      "about": true,
      "about": "The feature name"A random salt generated by the client." },
      { "name": "MinFeatureLevelSaltedPassword", "type": "int16bytes", "versions": "0+",
        "about": "The current finalized minimum feature level of this feature for the clustersalted password." },
      { "name": "MaxFeatureLevelIterations", "type": "int16int32", "versions": "0+",
        "about": "The current finalized maximum feature level number of thisiterations featureused forin the clusterSCRAM credential." }]}
  ]
} 

...

FeatureLevelRecord

Code Block
{
  "apiKey": 1312,
  "type": "metadata",
  "name": "FailedReplicasRecordFeatureLevelRecord",
  "validVersions": "0",
  "fields": [
    { "name": "BrokerIdName", "type": "int32string", "versions": "0+",
      "about": "The broker id." },
    { "name": "Topics", "type": "[]TopicWithFailures", "versions": "0+""mapKey": true,
      "about": "The topics with failed replicasfeature name.", "fields": [
   },
    { "name": "TopicIdMinFeatureLevel", "type": "uuidint16", "versions": "0+",
        "about": "The topic UUID current finalized minimum feature level of this feature for the cluster." },
      { "name": "PartitionsMaxFeatureLevel", "type": "[]int32int16", "versions": "0+",
        "about": "The current partitionfinalized ids."maximum }feature level of this 
feature for the cluster." ]}
  ]
} 

...

FailedReplicasRecord

Code Block
{
  "apiKey": 1413,
  "type": "metadata",
  "name": "QuotaRecordFailedReplicasRecord",
  "validVersions": "0",
  "validVersions "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "fieldsabout": [ "The broker id." },
    { "name": "EntityTopics", "type": "[]EntityDataTopicWithFailures", "versions": "0+",
      "about": "The quotatopics entitywith tofailed alterreplicas.", "fields": [
      { "name": "EntityTypeTopicId", "type": "stringuuid", "versions": "0+",
        "about": "The entitytopic typeUUID." },
      { "name": "EntityNamePartitions", "type": "string[]int32", "versions": "0+", "nullableVersions": "0+",
        "about": "The namepartition ofids." the} entity, or null if the default." }
    ]},
  ]
} 

QuotaRecord

Code Block
{
 { "nameapiKey": "Key"14,
  "type": "stringmetadata",
  "versionsname": "0+QuotaRecord",
      "aboutvalidVersions": "0"The,
 quota configuration key." }, "fields": [
    { "name": "ValueEntity", "type": "[]EntityData"float64", "versions": "0+",
      "about": "The valuequota entity to setalter.", otherwise ignored if the value is to be removed." },
"fields": [
      { "name": "RemoveEntityType", "type": "boolstring", "versions": "0+",
        "about": "Whether the quota configuration value should be removed, otherwise set." }
  ]
} 

New Metrics

...

kafka.controller:type=KafkaController,name=MetadataLag

...

The offset delta between the latest metadata record this controller has replayed and the last stable offset of the metadata topic.

...

New name for kafka.controller:type=KafkaController,name=SnapshotLag

The offset delta between the latest stable offset of the metadata topic and the offset of the last snapshot (or the last stable offset itself, if there are no snapshots)

The entity type." },
      { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The name of the entity, or null if the default." }
    ]},
    { "name": "Key", "type": "string", "versions": "0+",
      "about": "The quota configuration key." },
    { "name": "Value", "type": "float64", "versions": "0+",
      "about": "The value to set, otherwise ignored if the value is to be removed." },
    { "name": "Remove", "type": "bool", "versions": "0+",
      "about": "Whether the quota configuration value should be removed, otherwise set." }
  ]
} 

New Metrics

Full NameDescription

kafka.controller:type=KafkaController,name=MetadataLag

The offset delta between the latest metadata record this controller has replayed and the last stable offset of the metadata topic.

kafka.controller:type=KafkaServer,name=MetadataLagThe offset delta between the latest metadata record this broker has replayed and the last stable offset of the metadata topic.
kafka.controller:type=KafkaController,name=MetadataCommitLatencyMsThe latency of committing a message to the metadata topic.  Relevant on the active controller.
kafka.controller:type=KafkaController,name=MetadataCommitRatePerSecThe number of metadata messages per second committed to the metadata topic.
kafka.controller:type=KafkaController,name=MetadataSnapshotOffsetLag

New name for kafka.controller:type=KafkaController,name=SnapshotLag

The offset delta between the latest stable offset of the metadata topic and the offset of the last snapshot (or the last stable offset itself, if there are no snapshots)

Unused Metrics in KIP-500 Mode

We will deprecate these metrics as soon as legacy mode is deprecated.  For now, they will be unused in KIP-500 mode.

Full NameDescription

kafka.server:type=SessionExpireListener,name=ZooKeeperExpiresPerSec

No longer needed when running in KIP-500 mode because we won't have any ZK sessions

Compatibility, Deprecation, and Migration Plan

As described above, this KIP outlines a new mode that the broker can run in, KIP-500 mode.  For now, this mode will be experimental, and there will be no way to migrate existing clusters from legacy mode to KIP-500 mode.  We plan on outlining how this upgrade process will work in a follow-on KIP.  We do plan on deprecating legacy mode eventually, but we are not quite ready to do it yet in this KIP.

Since KIP-500 mode is currently in a pre-alpha state, we do not guarantee that future versions will support upgrading from the current version of it yet.  Once it is more stable, we will have a more traditional binary compatibility regime.

Rejected Alternatives

Suport Automatic Broker ID Assignment

This KIP proposes to drop support for automatic broker ID assignment.  What if we decided to continue to support it?

If we were willing to take a little bit more complexity on board, it would be relatively easy to support automatic broker ID assignment.  Brokers could simply ask the active controller to assign them a new ID when starting up, just as they previously obtained one from ZooKeeper.

However, automatic controller ID assignment is a much more difficult problem.  We never actually supported automatically assigning ZooKeeper IDs, so there is no pattern to follow here.  In general, Raft assumes that nodes know their IDs before the protocol begins. We cannot rely on random assignment because the 31 bit space is not large enough. We could perhaps create a separate protocol for assigning node IDs, but it might be complex.  

In general it's not clear how useful automatic broker ID assignment really is.  Configuration management software like Puppet, Chef, or Ansible can easily create a new ID for each node's configuration file.  Therefore, it's probably best to use this compatibility break to drop support for automatic broker ID assignment.

Combined Heartbeats and Fetch Requests

The brokers are always fetching new metadata from the controller.  Why not combine these fetch requests with the heartbeat requests, so that the brokers only have to send one request rather than two?

The main reason for making them separate requests is to have better separation of concerns.  Fetching metadata is logically a bit different than sending a heartbeat, and coupling them could result in a messy design and code.  We would have to add significant extra complexity to the FetchRequest schema.  Perhaps even worse, we would need to make the timing of fetch requests line up with the timing needed for broker heartbeats

Unused Metrics in KIP-500 Mode

We will deprecate these metrics as soon as legacy mode is deprecated.  For now, they will be unused in KIP-500 mode.

...

kafka.server:type=SessionExpireListener,name=ZooKeeperExpiresPerSec

...

No longer needed when running in KIP-500 mode because we won't have any ZK sessions

Compatibility, Deprecation, and Migration Plan

As described above, this KIP outlines a new mode that the broker can run in, KIP-500 mode.  For now, this mode will be experimental, and there will be no way to migrate existing clusters from legacy mode to KIP-500 mode.  We plan on outlining how this upgrade process will work in a follow-on KIP.  We do plan on deprecating legacy mode eventually, but we are not quite ready to do it yet in this KIP.

Since KIP-500 mode is currently in a pre-alpha state, we do not guarantee that future versions will support upgrading from the current version of it yet.  Once it is more stable, we will have a more traditional binary compatibility regime.

Rejected Alternatives

Suport Automatic Broker ID Assignment

This KIP proposes to drop support for automatic broker ID assignment.  What if we decided to continue to support it?

If we were willing to take a little bit more complexity on board, it would be relatively easy to support automatic broker ID assignment.  Brokers could simply ask the active controller to assign them a new ID when starting up, just as they previously obtained one from ZooKeeper.

However, automatic controller ID assignment is a much more difficult problem.  We never actually supported automatically assigning ZooKeeper IDs, so there is no pattern to follow here.  In general, Raft assumes that nodes know their IDs before the protocol begins. We cannot rely on random assignment because the 31 bit space is not large enough. We could perhaps create a separate protocol for assigning node IDs, but it might be complex.  

In general it's not clear how useful automatic broker ID assignment really is.  Configuration management software like Puppet, Chef, or Ansible can easily create a new ID for each node's configuration file.  Therefore, it's probably best to use this compatibility break to drop support for automatic broker ID assignment.

Combined Heartbeats and Fetch Requests

The brokers are always fetching new metadata from the controller.  Why not combine these fetch requests with the heartbeat requests, so that the brokers only have to send one request rather than two?

The main reason for making them separate requests is to have better separation of concerns.  Fetching metadata is logically a bit different than sending a heartbeat, and coupling them could result in a messy design and code.  We would have to add significant extra complexity to the FetchRequest schema.  Perhaps even worse, we would need to make the timing of fetch requests line up with the timing needed for broker heartbeats.

Shared IDs between Multiple Nodes

One possibility that we've discussed is whether we could have controller IDs and broker IDs share the same ID space.  For example, could there be both a broker 1 and a controller 1?  This is not a good idea because NetworkClient assumes a single ID space.  So if there is both a controller 1 and a broker 1, we don't have a way of picking the "right" one.  We would have to add a concept of node types.  That is a fair amount of extra work.  It also is a bit conceptually messy for the network layer to understand node types, rather than just treating them all as endpoints.

Another reason not to share node IDs is that it would make migrating from a combined broker+controller node to separate nodes more difficult.  For example, let's say you have three combined nodes but the controller load is getting too high and you now want a separate controller node.  If you also have to change the controller ID, the migration becomes more difficult.  Since controller node IDs make their way into the Raft log itself, changing the ID later on is non-trivial.

External metrics and management systems often make use of the concept of node ID.  For example, someone may want to aggregate all the socket server metrics from node 1.  But if there are actually two socket server sharing this same node ID (controller and broker) this may create problems and require changes to the external system.

A related question is whether a broker and a controller could share the same port, if they are co-located in the same JVM.  We discussed this as part of KIP-590 and agreed that we would not share ports.  A shared port would make it impossible to have separate RPC handlers, complicating the code.  It would also make it impossible to have separate, stricter authentication for controllers.  Sharing a port with the broker opens us up to denial of service attacks, including unintentional ones.