Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, any broker node can be elected as the controller.  As part of this KIP, the active controller will instead be selected among a small potentially smaller pool of nodes specifically configured to act as controllers.  Typically three or five nodes in the cluster will be selected to be controllers.

...

Note that as long as at least one of the provided controller addresses is valid, the broker will be able to learn about the current metadata quorum and start up.  Once the broker is in contact with the metadata quorum, the quorum bootstrap addresses will not be needed.  This makes it possible to reconfigure the metadata quorum over time.  For example, if we start with a metadata quorum of host1, host2, host3, we could replace host3 with host4 without disrupting any of the brokers.  Then we could roll the brokers to apply the new metadata quorum bootstrap configuration of host1, host2, host4 on each one.

Node IDs

Just like brokers, controller nodes will have non-negative integer node IDs.  There will be a single ID space.  In other words, no controller should share the same ID as a broker.  Even when a broker and a controller are co-located in the same JVM, they must have different node IDs.

We define a node here as a tuple consisting of a node ID and a process role. Roles are defined by the `process.roles` configuration. As explained above, in a co-located configuration, a single process may take both the "controller" and "broker" roles. The node ID for both of these roles will be defined by the `node.id` configuration. However, this is mainly for configuration convenience. Semantically, we view the co-located process as representing two distinct nodes. Each node has its own listeners and its own set of APIs which it exposes. The APIs exposed by a controller node will not be the same as those exposed by a broker node.

Automatic node ID assignment via ZooKeeper will no longer be supported in KIP-500 mode.  Node Automatic node ID assignment via ZooKeeper will no longer be supported in KIP-500 mode.  Node IDs must be set in the configuration file for brokers and controllers.

Networking

Controller processes will listen on a separate endpoint from brokers.  This will be true even when the broker and controller are co-located in the same JVM.

In a well-run Kafka deployment, controller ports, like ZooKeeper ports, should be firewalled off from clients.  This will prevent clients from disrupting the cluster by flooding the controller ports with requests.  In the realm of ACLs, this translates to controllers requiring CLUSTERACTION on CLUSTER for all operations.  (KIP-590 describes how users' administrative requests will be forwarded to the controller quorum as needed.)

The only time when clients should contact a controller node directly is when they are debugging system issues.  This is similar to ZooKeeper, where we have things like zk-shell, but only for debugging.

Note that controllers do not appear in the MetadataResponses given to clients.

Metadata

The Metadata Topic

As described in KIP-500, the controller will store its data in the internal @metadata topic__cluster_metadata topic.  This topic will contain a single partition which is managed by Raft, as described in KIP-595: A Raft Protocol for the Metadata Quorum.

...

Metadata changes need to be persisted to the @metadata __cluster_metadata log before we apply them on the other nodes in the cluster.  This means waiting for the metadata log's last stable offset to advance to the offset of the change.  After that point, we are guaranteed not to lose the change as long as we uphold the Raft invariants.

...

However, this "future state" may never be committed.  For example, the active controller might fail, truncating some of its future state.  Therefore, the active controller must not make this future state "visible" to the rest of the cluster until it has been made persistent – that is, until it becomes current state.  In the case of the @metadata topic__cluster_metadata topic, the replication protocol itself neatly takes care of this for us.  In the case of controller RPCs like AlterIsr, the controller handles this by not sending back a response until the designated change has been persisted.

...

The active controller makes changes to the metadata by appending records to the log. Each record has a null key, and this format for its value:

  1. an unsigned varint specifying the frame type (currently 0)
  2. an unsigned varint specifying the record type.
  3. an unsigned varint specifying the record version
  4. the payload in Kafka RPC format

For example, if we wanted to encode a TopicRecord, we might have 0 encoded as a varint, 1 encoded as a varint, followed by 0 as the record version, followed by the serialized topic data.

The frame type, record type, and version will typically only take one byte each, for a total header size overhead of two three bytes.

Record Format Versions

...

Broker Registration and State Management

Broker Startup

When starting up, the broker will send a fetch request to the controller, getting the state of the metadata log.  If the broker's local copy of the metadata log is too far behind, the broker will fetch a complete snapshot from the controller, as described in KIP-630.  Otherwise, the broker will fetch just what it needs.  Once it has caught up to the high water mark of the controller (or at least, what the controller's high water mark used to be), it will be able to remove the log directories which it no longer needs.

Broker Heartbeats

The Three Cluster Membership States

Currently, from the perspective of ZooKeeper, there are two states brokers can be in: registered, and not registered.  When brokers are registered, other brokers can find their network endpoints in order to communicate with them.  They are also part of the MetadataResponse communicated back to clients.  When they are not registered, neither of those are true.Every distributed system needs a way of managing cluster membership.  Prior to KIP-500, Kafka brokers registered ephemeral znodes in order to register themselves as part of the cluster.  The Kafka controller passively consumed the registration information from Zookeeper.

In the post-KIP-500 world there is no ZooKeeper and no ephemeral znodes.  Instead, each broker sends a BrokerHeartbeat request to the active controller every few seconds.

This heartbeat acts as a registration.  However, the controller has a choice about whether to accept it.  It will reject brokers whose metadata is too stale, or whose IDs have been claimed by another broker.  It will also reject brokers that do not support the minimum feature level of all KIP-584 features that are enabled.

When the broker accepts the registration, it grants or renews a broker ID lease associating the broker process with its ID.  Leases are time-bounded.

A broker cannot continue using a lease indefinitely after sending a single heartbeat.  When brokers are rejected by the controller, or otherwise unable to renew their lease before it expires, they enter the "fenced" state.

Fencing

Brokers that don't have a broker ID lease are said to be "fenced."  When a broker is fenced, it cannot process any client requests.  This prevents brokers which are not receiving metadata updates or that are not receiving and processing them fast enough from causing issues to clients.

Brokers start up in the fenced state, and can leave this state only by sending a heartbeat to the active controller and getting back a response that tells them they can become active.

Brokers will re-enter the fenced state if they are unable to communicate with the active controller within registration.lease.timeout.ms.

Controlled Shutdown

In the pre-KIP-500 world, brokers triggered a controller shutdown by making an RPC to the controller.  When the controller returned a successful result from this RPC, the broker knew that it could shut down.

In the post-KIP-500 world, controller shutdown is handled by the broker heartbeat system instead.  In its periodic heartbeats, the broker asks the controller if it can transition into the SHUTDOWN state.  This motivates the controller to move all of the leaders off of that broker.  Once they are all moved, the controller responds to the heartbeat with a nextState of SHUTDOWN.

Broker ID Conflicts

Clearly, in a correctly managed cluster, there should be no broker ID conflicts.  Each broker should be configured with a unique ID.  However, we want the system to be robust against misconfigurations.  Therefore, if there are two brokers that claim the same ID, the controller will choose only one and tell the other to fence itself.

When a broker first starts up, when it is in the INITIAL state, it will always "win" broker ID conflicts.  However, once it is granted a lease, it transitions out of the INITIAL state.  Thereafter, it may lose subsequent conflicts if its broker epoch is stale.  (See KIP-380 for some background on broker epoch.)  The reason for favoring new processes is to accommodate the common case where a process is killed with kill -9 and then restarted.  We want it to be able to reclaim its old ID quickly in this case.  The controller can generate a new broker epoch by using the latest log offset.

The Broker State Machine

This state machine ties together a few of the previous sections, such as broker registration and shutdown.

The broker heartbeat always contains the broker's epoch, unless the broker is in the INITIAL state.  When the broker is in the INITIAL state, it is waiting for the controller to assign it a new broker epoch.

Code Block
                        INITIAL
sent initial heartbeart,  | 
got back an epoch         |
                          V
                        ACTIVE -------------------------> SHUTDOWN
lease expired or revoked  |  ^                              ^
(stale metadata,          |  | lease restored               | controller granted
 id conflict,             |  | with new broker epoch        | controlled shutdown
 unable to communicate,   V  |                              |
 etc.)                  FENCED -----------------------------+

Public Interfaces

Command-Line Tools

There will be a new command-line tool, kafka-storage.sh.

Code Block
kafka-storage.sh: inspect and modify kafka storage directories
subcommands:
    info: print out information about kafka storage directories
        -c|--config [path]: the configuration file to use
    format: format the kafka storage directories
        -c|--config [path]: the configuration file to use
        -f|--force: format even if the directories are not empty.
        -d|--directory [path]: format only a single directory, rather than all directories

The info subcommand will display information about each directory, such as whether it exists, is formatted, etc.

The format subcommand will initialize the subdirectories.

A cluster configured in KIP-500 mode will contain the following line in the meta.properties file in each directory:

Code Block
kip.500.mode=enabled

If the storage directories in a node have not been properly formatted, the node will not be able to start up in KIP-500 mode.  On the other hand, formatting will continue to be optional for legacy mode.

Configurations

...

null

broker

controller

broker,controller

...

If this is null (absent) then we are in legacy mode.

Otherwise, we are in KIP-500 mode and this configuration determines what roles this process should play: broker, controller, or both.

...

If non-null, this must be a comma-separated list of listener names.

When communicating with the controller quorum, the broker will always use the first listener in this list.

...

A comma-separated list of the names of the listeners used by the KIP-500 controller. This is required if this process is a KIP-500 controller. The legacy controller will not use this configuration

Despite the similar name, note that this is different from the "control plane listener" introduced by KIP-291.  The "control plane listener" is used on brokers, not on controllers.

...

If non-null, this must be a comma-separated list of all the controller voters, in the format:

{controller-id}@{controller-host):{controller-port}

...

When in KIP-500 mode, each node must have this configuration, in order to find out how to communicate with the controller quorum.

Note that this replaces the "quorum.voters" config described in KIP-595.

...

a 32-bit ID

...

The controller id for this server.  Only required if this server is a controller.

...

controller.quorum.election.timeout.ms

...

, there will be three cluster membership states: unregistered, registered but fenced, and registered and active.  Just like today, unregistered means that there is no registration information and no way to reach the broker.  It is effectively not part of the cluster.  In the two registered states, in contrast, contact information is available.  However, in the "registered but fenced" state, the contact information might no longer be valid.  For example, if a broker crashes and is not restarted, it will end up in "registered but fenced" state.

A broker only appears in MetadataResponse if it is in the "registered and active" state.  If it is in the unreigstered state, or the "registered and fenced" state, it will not appear in MetadataResponse.

Essentially, registration relates to the permanent state of the cluster: the number of brokers we expect to have, what racks we expect them to be in, and so on.  Fencing refers to the transitory state of the cluster: which brokers are currently available.

By separating the permanent state from the transitory state, we can more effectively handle transitory issues.  For example, if you have a 3 node cluster that is undergoing rolling upgrade, one of the nodes might be down because it is rolling.  However, we should still allow users to create new topics with replication factor 3. Currently, that is not possible, because the node's registration information gets wiped the moment its ZK registration goes away.  With KIP-631, the registration remains, although the node becomes fenced.  Another example is doing reassignment on a cluster where one or more nodes is down.  Currently, when a node is down, all of its ZK registration information is gone.  But  we need this information in order to understand things like whether the replicas of a particular partition are well-balanced across racks.

Broker Registration

Every distributed system needs a way of managing cluster membership.  Prior to KIP-500, Kafka brokers registered ephemeral znodes in order to register themselves as part of the cluster.  The Kafka controller passively consumed the registration information from Zookeeper.

In the post-KIP-500 world there is no ZooKeeper and no ephemeral znodes.  Instead, each broker registers itself with the active controller using a BrokerRegistrationRequest. The active controller assigns the broker a new broker epoch, based on the next available offset in the log.  The new epoch is guaranteed to be higher than any previous epoch that has been used for the given broker id.

Each registration request contains a UUID which identifies the process which sent it.  This ID is called the incarnation ID.  This ensures that if the response to the registration request is lost, the broker can simply re-send the registration RPC and get the same successful result as before.

Registration requests also have information about the feature flags which the broker software supports.  The controller will refuse to register brokers if they don't support the feature flags which are active in the cluster.  In this case, the sysadmin needs to upgrade the broker software before it can be added to the cluster.

Handling Broker ID Conflicts

The controller only allows one broker process to be registered per broker ID.  Of course, broker processes go away occasionally-- for example, if a broker crashes.  A broker ID can be reused once a certain amount of time has gone past without any contact with the previous incarnation of the broker.

For the purpose of this section, handling a registration request or a broker heartbeat request are both considered forms of contact (even if the broker is fenced).

Broker Leases

When the broker first starts up, it doesn't want to be unfenced immediately.  The reason is because it needs time to perform log recovery and some other startup tasks.  It is good for the broker to be fenced during this time, so that clients do not try to contact it and fail.  The broker indicates that it is not ready to be unfenced by setting ShouldFence = true in the heartbeats it sends out during this period.

Once the broker is ready to be unfenced, it starts setting ShouldFence = false in the heartbeats it sends out.  This makes it eligible for unfencing.  However, the controller will not actually unfence the broker unless its metadata is reasonably current.  The controller determines this by examining the metadata offset in the heartbeat request.

As mentioned earlier, brokers which are fenced will not appear in MetadataResponses.  So clients that have up-to-date metadata will not try to contact fenced brokers.

Broker leases are time-bounded.  Once the period has elapsed, if the broker has not renewed its lease via a heartbeat, it will be fenced.

Controlled Shutdown

In the pre-KIP-500 world, brokers triggered a controller shutdown by making an RPC to the controller.  When the controller returned a successful result from this RPC, the broker knew that it could shut down.

In the post-KIP-500 world, controller shutdown is handled by the broker heartbeat system instead.  In its periodic heartbeats, the broker asks the controller if it can transition into the controlled shutdown state.  It does this by setting the WantShutDown boolean.  This motivates the controller to move all of the leaders off of that broker.  Once they are all moved, the controller responds to the heartbeat with ShouldShutDown = true.  At that point, the broker knows it's safe to begin the shutdown process proper.

The Broker State Machine

NOT_RUNNING

This is the state that the broker is in before it has started up.  When the broker starts up, it transitions to STARTING.

STARTING

While the broker is in this state, it is trying to catch up with the latest metadata.  It fetches the metadata from the controller quorum. Once it has finished catching up, it transitions to the RECOVERY state.

RECOVERY

The broker is in this state while it is starting the log manager.  If the shutdown was clean, the broker will leave this state very quickly.  If the shutdown was unclean, the broker will stay in this state until log recovery is complete.

Once log recovery is done, the broker will start listening on the socket server.  It will then ask the controller to unfence it.  Once the controller agrees, it will transition to the RUNNING state.

RUNNING

The broker is in this state when it's up and running.

PENDING_CONTROLLED_SHUTDOWN

The broker is in this state when it has received a SIGTERM and is trying to shut down.

SHUTTING_DOWN

The broker is in this state when controlled shutdown has finished and it is shutting down.

Changes in the Broker State Machine

The numeric constants exposed through the metrics API have not changed, and there are no new or removed states.

The main change in the broker state machine is that the RECOVERING_FROM_UNCLEAN_SHUTDOWN state has been renamed to RECOVERY.  Also, unlike previously, the broker will always pass through RECOVERY (although it may only stay in this state for a very short amount of time).

Partition Reassignment

Just like the ZooKeeper-based controller, the quorum controller will implement the KIP-455 partition reassignment API. This API specifies both alter and list operations.

The alter operation specifies a partition, plus a target replica list. If the target replica list is a subset of the partition's current replica list, and the new replica list contains at least one member of the current ISR, then the reassignment can be completed immediately. In this case, the controller issues a PartitionChangeRecord changing the replica set and ISR (if appropriate).

On the other hand, if the new replica list has additional replicas, or specifies a subset of replicas that doesn't intersect with the current ISR, then we add the additional replicas to the replica list, and wait for the ISR to catch up.

So, for example, if the old replica list was [1, 2, 3], and the new list was [3, 4, 5], we would proceed as follows:

  • change the replica list to [1, 2, 3, 4, 5], and set addingReplicas to [4, 5]
  • Wait for an alterIsr which adds both broker 4 and 5
  • Change the replica list to [3, 4, 5 ] and prune the ISR accordingly. Remove addingReplicas.

Just like with the ZooKeeper-based controller, the new controller stores reassignments as "hard state." Reassignments will continue even after a controller failover or broker shutdown and restart. Reassignment state appears in controller snapshots, as addingReplicas and removingReplicas.

Unlike the old controller, the quorum controller does not prevent topics that are undergoing reassignment from being deleted. If a topic is deleted, then any partition reassignments that it had are terminated.

Typically, reassignments which require ISR changes are completed by a leader adding some new replicas, via the alterIsr RPC. (Note that another possible way for a reassignment to be completed is via a broker being added to a partition ISR during unfencing). 

When a broker makes an alterIsr change that completes a reassignment, the resulting ISR will be different than requested. For example, if the leader is 1, and the current ISR is 1, 2, 3, and the target replica set is 1, 2, 4, when the leader tries to change the ISR to 1, 2, 3, 4, the controller will change it to 1, 2, 4 instead, completing the reassignment. Since the alterIsr response returns the actual ISR which was actually applied, the leader will apply this new ISR instead.

If the new ISR would does not contain the leader which made the alterIsr call, the controller returns FENCED_LEADER_EPOCH. This will notify the broker that it should wait until it can replay the latest partition change from the log. That partition change record will make this broker a follower rather than a leader.

Public Interfaces

meta.properties

When a storage directory is in use by a cluster running in kip-500 mode, it will have a new version of the meta.properties file.  Since the current version is 0, the new version will be 1.   Just as in version 0, meta.properties will continue to be a Java Properties file.  This essentially means that it is a plain text file where each line has the format key=value.

In version 0 of meta.properties, the cluster.id field is optional.  In contrast, in version 1 it is mandatory.

In version 0 of meta.properties, the cluster.id field is serialized in hexadecimal.  In contrast, in version 1 it is serialized in base64.

Version 1 of meta.properties replaces the broker.id field with node.id.

For servers running in kip-500 mode, the `meta.properties` file must be present in every log directory. The process will raise an error during startup if if either the meta.properties file does not exist or if the node.id found does not match what the value from the configuration file.

Here is an example of a version 1 meta.properties file:

Code Block
$ cat /tmp/kafka-logs/meta.properties
#
#Tue Dec 01 10:08:08 PST 2020
cluster.id=3Db5QLSqSZieL3rJBUUegA
version=1
node.id=0

kafka-storage.sh

There will be a new command-line tool, kafka-storage.sh.

Code Block
$ ./bin/kafka-storage.sh -h
usage: kafka-storage [-h] {info,format,random-uuid} ...

The Kafka storage tool.

positional arguments:
  {info,format,random-uuid}
    info                 Get information about the Kafka log directories on this node.
    format               Format the Kafka log directories on this node.
    random-uuid          Print a random UUID.

optional arguments:
  -h, --help             show this help message and exit

kafka-storage.sh will have three subcommands: info, format, and random-uuid.

info

Code Block
$ ./bin/kafka-storage.sh info -h
usage: kafka-storage info [-h] --config CONFIG

optional arguments:
  -h, --help             show this help message and exit
  --config CONFIG, -c CONFIG
                         The Kafka configuration file to use.

The info command will give information about the configured storage directories.  Example output:

Code Block
Found log directory:
  /tmp/kafka-logs

Found metadata: MetaProperties(clusterId=51380268-1036-410d-a8fc-fb3b55f48033)

format

Code Block
$ ./bin/kafka-storage.sh format -h
usage: kafka-storage format [-h] --config CONFIG --cluster-id CLUSTER_ID [--ignore-formatted]

optional arguments:
  -h, --help             show this help message and exit
  --config CONFIG, -c CONFIG
                         The Kafka configuration file to use.
  --cluster-id CLUSTER_ID, -t CLUSTER_ID
                         The cluster ID to use.
  --ignore-formatted, -g

When running kip-500 mode, the storage directories must be formatted using this command prior to starting up the brokers and controllers.

If any of the storage directories are formatted, the command will normally fail.  This behavior can be changed by passing the --ignore-formatted option.  When this option is passed, the format command will skip over already formatted directories rather than failing.

random-uuid

Code Block
$ ./bin/kafka-storage.sh random-uuid -h
usage: kafka-storage random-uuid [-h]

optional arguments:
  -h, --help             show this help message and exit

The random-uuid command prints out a random UUID to stdout.

Code Block
$ ./bin/kafka-storage.sh random-uuid
8XUwXa9qSyi9tSOquGtauQ

kafka-cluster.sh

There will also be new command-line tool named kafka-cluster.sh.

Code Block
$ ./bin/kafka-cluster.sh -h
usage: kafka-cluster [-h] {id,unregister} ...

The Kafka cluster tool.

positional arguments:
  {cluster-id,unregister}
    cluster-id           Get information about the ID of a cluster.
    unregister           Unregister a broker ID.

optional arguments:
  -h, --help             show this help message and exit

kafka-storage.sh will have two subcommands: id and unregister.

cluster-id

Code Block
$ ./bin/kafka-cluster.sh cluster-id -h
usage: kafka-cluster cluster-id [-b,-c,-h]

optional arguments:
  -b, --bootstrap-server a list of host/port pairs to use for establishing the connection to the kafka cluster.
  -c, --config           a property file containing configs to be passed to Admin Client.
  -h, --help             show this help message and exit

The ID command prints out the cluster id

unregister

Code Block
$ ./bin/kafka-cluster.sh unregister -h
usage: kafka-cluster unregister [-h] [--bootstrap-server BOOTSTRAP_SERVER] [--config CONFIG] [--id ID]

optional arguments:
  -h, --help             show this help message and exit
  --bootstrap-server BOOTSTRAP_SERVER, -b BOOTSTRAP_SERVER
                         A list of host/port pairs to use for establishing the connection to the kafka cluster.
  --config CONFIG, -c CONFIG
                         A property file containing configs to passed to AdminClient.
  --id ID, -i ID         The ID of the broker to unregister.

The decommission command removes the registration of a specific broker ID.  It will use make an UnregisterBrokerRequest in order to do this.

Changes to kafka-dump-log-segments.sh

kafka-dump-log-seguments.sh will have two new flags: --cluster-metadata-decoder, and --skip-record-metadata.

The --cluster-metadata-decoder flag will tell the DumpLogSegments tool to decode the records as KIP-500 metadata.  Each record will be output in the following JSON format:

Code Block
payload: {"type":<record type enum name>, "version":<record version number>, "data":<record JSON>}

Example output:

Code Block
 payload: {"type":"TOPIC_RECORD","version":0,"data":{"name":"bar","topicId":"GU_rXds2FGppL1JqXYpx2g"}}
 payload: {"type":"PARTITION_RECORD","version":0,"data":{"partitionId":0,"topicId":"GU_rXds2FGppL1JqXYpx2g","replicas":[1],"isr":[1],"removingReplicas":null,"addingReplicas":null,"leader":1,"leaderEpoch":0,"partitionEpoch":0}}
 payload: {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":0,"topicId":"GU_rXds2FGppL1JqXYpx2g","leader":-1}}
 payload: {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":0,"topicId":"WCnrza5uWKeerYa7HCNpOg","leader":-1}}
 payload: {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":0,"topicId":"GU_rXds2FGppL1JqXYpx2g","leader":-1}}
 payload: {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":0,"topicId":"WCnrza5uWKeerYa7HCNpOg","leader":-1}}
 payload: {"type":"FENCE_BROKER_RECORD","version":0,"data":{"id":1,"epoch":0}}

The --skip-record-metadata flag will skip printing metadata for each record.  However, metadata for each record batch will still be printed when this flag is present.

kafka-metadata-shell.sh

The Kafka Metadata shell is a new command which allows users to interactively examine the metadata stored in a KIP-500 cluster.

It can read the metadata from the controllers directly, by connecting to them, or from a metadata snapshot on disk.  In the former case, the quorum voters must be specified by passing the --controllers flag; in the latter case, the snapshot file should be specified via --snapshot.

Code Block
$ ./bin/kafka-metadata-shell.sh -h
usage: metadata-tool [-h] [--controllers CONTROLLERS] [--config CONFIG] [--snapshot SNAPSHOT] [command [command ...]]

The Apache Kafka metadata tool

positional arguments:
  command                The command to run.

optional arguments:
  -h, --help             show this help message and exit
  --controllers CONTROLLERS, -C CONTROLLERS
                         The quorum voter connection string to use.
  --config CONFIG, -c CONFIG
                         The configuration file to use.
  --snapshot SNAPSHOT, -s SNAPSHOT
                         The snapshot file to read.

The metadata tool works by replaying the log and storing the state into in-memory nodes.  These nodes are presented in a fashion similar to filesystem directories.  For browsing the nodes, several commands are supported:

Code Block
>> help
Welcome to the Apache Kafka metadata shell.

usage:  {cat,cd,exit,find,help,history,ls,man,pwd} ...

positional arguments:
  {cat,cd,exit,find,help,history,ls,man,pwd}
    cat                  Show the contents of metadata nodes.
    cd                   Set the current working directory.
    exit                 Exit the metadata shell.
    find                 Search for nodes in the directory hierarchy.
    help                 Display this help message.
    history              Print command history.
    ls                   List metadata nodes.
    man                  Show the help text for a specific command.
    pwd                  Print the current working directory.

The interface of the metadata tool is currently considered unstable and may change when KIP-500 becomes production-ready.

Configurations

Configuration NamePossible ValuesNotes
process.roles

null

broker

controller

broker,controller

If this is null (absent) then we are in legacy mode.

Otherwise, we are in KIP-500 mode and this configuration determines what roles this process should play: broker, controller, or both.

controller.listener.names

If non-null, this must be a comma-separated list of listener names.

When communicating with the controller quorum, the broker will always use the first listener in this list.

A comma-separated list of the names of the listeners used by the KIP-500 controller. This configuration is required if this process is a KIP-500 controller. The legacy controller will not use this configuration

Despite the similar name, note that this is different from the "control plane listener" introduced by KIP-291.

listeners

A comma-separated list of the configured listeners.  For example,

INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094

This configuration is now required.
sasl.mechanism.controller.protocolSASL mechanism used for communication with controllers. Default is GSSAPI.This is analogous to sasl.mechanism.inter.broker.protocol, but for communication with the controllers.
controller.quorum.voters

If non-null, this must be a comma-separated list of all the controller voters, in the format:

{controller-id}@{controller-host):{controller-port}

When in KIP-500 mode, each node must have this configuration, in order to find out how to communicate with the controller quorum.

Note that this replaces the "quorum.voters" config described in KIP-595.

This configuration is required for both brokers and controllers.

node.ida 32-bit ID

This configuration replaces `broker.id` for zk-based Kafka processes in order to reflect its more general usage. It serves as the ID associated with each role that the process is acting as.

For example, a configuration with `node.id=0` and `process.roles=broker,controller` defines two nodes: `broker-0` and `controller-0`.

initial.broker.registration.timeout.ms60000When initially registering with the controller quorum, the number of milliseconds to wait before declaring failure and exiting the broker process.
broker.heartbeat.interval.ms3000The length of time between broker heartbeats.
broker.session.timeout.ms18000The length of time that a broker lease lasts if no heartbeats are made.
metadata.log.dirIf set, this must be a path to a log directory.This configuration determines where we put the metadata log.  if it is not set, the metadata log is placed in the first log directory from log.dirs.
controller.quorum.fetch.timeout.msMaximum time without a successful fetch from the current leader before a new election is started.New name for quorum.fetch.timeout.ms

controller.quorum.election.timeout.ms

Maximum time without collected a majority of votes during the candidate state before a new election is retriedNew name for quorum.election.timeout.ms
controller.quorum.election.backoff.max.msMaximum exponential backoff time (based on the number if retries) after an election timeout, before a new election is triggered.New name for quorum.election.backoff.max.ms
controller.quorum.request.timeout.msMaximum time before a pending request is considered failed and the connection is droppedNew name for quorum.request.timeout.ms
controller.quorum.retry.backoff.msInitial delay between request retries. This config and the one below is used for retriable request errors or lost connectivity and are different from the election.backoff configs aboveNew name for quorum.retry.backoff.ms
controller.quorum.retry.backoff.max.msMax delay between requests. Backoff will increase exponentially beginning from quorum.retry.backoff.msNew name for quorum.retry.backoff.max.ms

Deprecated Configurations

Configuration NameReason
control.plane.listener.nameWe no longer need to maintain a separate listener for messages from the controller, since the controller does not send messages out any more (it receives them).
broker.id.generation.enableAutomatic broker ID generation is no longer supported.
zookeeper.*We no longer need configurations for ZooKeeper.

New Error Codes

DUPLICATE_BROKER_REGISTRATION

There will be a new error code, DUPLICATE_BROKER_REGISTRATION, that the active controller will return when a broker tries to register with an ID that is currently in use.

INVALID_CLUSTER_ID

There will be a new error code, INVALID_CLUSTER_ID, that the controller will return if the broker tries to register with the wrong cluster ID.

AdminClient

There will be a new AdminClient RPC, unregisterBroker.

Code Block
languagejs
UnregisterBrokerResult unregisterBroker(int brokerId, UnregisterBrokerOptions options)

public class UnregisterBrokerResult {
    public KafkaFuture<Void> all();
}

public class UnregisterBrokerOptions extends AbstractOptions<UnregisterBrokerOptions> {
}

RPCs

Obsoleting the Metadata Propagation RPCs

As discussed earlier, the new controller will use FetchRequest to fetch metadata from the active controller.  The details of how Raft fetching will work are spelled out in KIP-595: A Raft Protocol for the Metadata Quorum.

Since we propagate the metadata via Raft, we will no longer need to send out LeaderAndIsrRequest, UpdateMetadataRequest, and StopReplicaRequest.  These requests will be sent out only when we're in legacy mode, not when we're in KIP-500 mode.  Eventually we will add some support for these requests to the new controller, in order to support rolling upgrade from a pre-KIP-500 release. However, for the purpose of this KIP, the new controller will not use these requests.

Obsoleting the Controlled Shutdown RPC

The broker heartbeat mechanism replaces the controlled shutdown RPC.  Therefore, we will not need to support the this RPC any more in the controller-- except for compatibility during upgrades, which will be described further in a follow-on KIP.

Topic Identifiers

This KIP builds on top of the work done in KIP-516 to improve how partitions are tracked.

Because each topic is identified by a unique topic UUID, we can implement topic deletion with a single record, RemoveTopicRecord.  Upon replaying this record, each broker will delete the associated topic if it is present.

Of course, some brokers may be down when the topic is deleted.  In fact, some brokers may never see the RemoveTopicRecord.  This record may get collapsed into one of the periodic metadata snapshots.  If this happens, the record will be reflected in the snapshot through the absence of a broker record, not its presence.  Therefore, during the startup process, brokers must compare the log directories that they have with the ones contained in the latest metadata.  The appropriate time to do this is at the start of the RECOVERY phase.  At this point, the broker has the latest metadata.

BrokerRegistration

Required ACLs: CLUSTERACTION on CLUSTER

Code Block
languagejs
{
  "apiKey": 57,
  "type": "request",
  "name": "BrokerRegistrationRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID." },
	{ "name": "ClusterId", "type": "string", "versions": "0+",
	  "about": "The cluster id of the broker process." },
	{ "name": "IncarnationId", "type": "uuid", "versions": "0+",
	  "about": "The incarnation id of the broker process." },
    { "name": "CurrentMetadataOffset", "type": "int64", "versions": "0+",
      "about": "The highest metadata offset which the broker has reached." },
    { "name": "Listeners", "type": "[]Listener",
      "about": "The listeners of this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname." },
        { "name": "Port", "type": "uint16", "versions": "0+",
          "about": "The port." },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol." }
      ]
    { "name": "Features", "type": "[]Feature",
      "about": "The features on this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The feature name." }
        { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The minimum supported feature level." },
        { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The maximum supported feature level." }
      ]
    },
    { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The rack which this broker is in." }
  ]
}

{
  "apiKey": 57,
  "type": "response",
  "name": "BrokerRegistrationResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker's assigned epoch, or -1 if none was assigned." }
  ]
}

BrokerHeartbeat

Required ACLs: CLUSTERACTION on CLUSTER

As described earlier, the broker periodically sends out a heartbeat request to the active controller.

Code Block
languagejs
{
  "apiKey": 58,
  "type": "request",
  "name": "BrokerHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker epoch." },
    { "name": "CurrentMetadataOffset", "type": "int64", "versions": "0+",
      "about": "One more than the highest metadata offset which the broker has reached." },
    { "name": "WantFence", "type": "bool", "versions": "0+",
      "about": "True if the broker wants to be fenced, false otherwise." }
    { "name": "WantShutDown", "type": "bool", "versions": "0+",
      "about": "True if the broker wants to initiate controlled shutdown." }
  ]
}   

{
  "apiKey": 58,
  "type": "response",
  "name": "BrokerHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "IsCaughtUp", "type": "bool", "versions": "0+",
      "about": "True if the broker has approximately caught up with the latest metadata." },
    { "name": "IsFenced", "type": "bool", "versions": "0+",
      "about": "True if the broker is fenced." },
    { "name": "ShouldShutDown", "type": "bool", "versions": "0+",
      "about": "True if the broker should proceed with its shutdown." }
  ]
}

The controller will wait to unfence a broker until it sends a heartbeat where ShouldFence is false and CurrentMetadataOffset is caught up.

If the heartbeat request has ShouldShutDown set, the controller will try to move all the leaders off of the broker.

The controller will set ControlledShutdownOk if the broker is cleared to execute a controlled shutdown.  In other words, if it has no leaderships.

The controller will return NOT_CONTROLLER if it is not active.  Brokers will always return NOT_CONTROLLER for these RPCs.

UnregisterBroker

Required ACLs: ALTER on CLUSTER

The UnregisterBrokerRequest asks the controller to unregister a broker from the cluster.

Code Block
languagejs
{
  "apiKey": 59,
  "type": "request",
  "name": "UnregisterBrokerRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID to unregister." }
  ]
}   

{
  "apiKey": 59,
  "type": "response",
  "name": "UnregisterBrokerResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
  ]
}

The valid response codes are:

  • NONE if the unregistration succeeded or if the broker was already unregistered.
  • NOT_CONTROLLER if the node that the request was sent to is not the controller
  • UNSUPPORTED_VERSION if KIP-500 mode is not enabled

Record Formats

RegisterBrokerRecord

Code Block
{
  "apiKey": 0,
  "type": "metadata",
  "name": "RegisterBrokerRecord"
,
  "validVersions": "0",
  "fields": [
	

RPCs

Obsoleting the Metadata Propagation RPCs

As discussed earlier, the new controller will use FetchRequest to fetch metadata from the active controller.  The details of how Raft fetching will work are spelled out in KIP-595: A Raft Protocol for the Metadata Quorum.

Since we propagate the metadata via Raft, we will no longer need to send out LeaderAndIsrRequest, UpdateMetadataRequest, and StopReplicaRequest.  These requests will be sent out only when we're in legacy mode, not when we're in KIP-500 mode.  Eventually we will add some support for these requests to the new controller, in order to support rolling upgrade from a pre-KIP-500 release. However, for the purpose of this KIP, the new controller will not use these requests.

Obsoleting the Controlled Shutdown RPC

The broker heartbeat mechanism replaces the controlled shutdown RPC.  Therefore, we will not need to support the this RPC any more in the controller-- except for compatibility during upgrades, which will be described further in a follow-on KIP.

BrokerHeartbeat

As described earlier, the broker periodically sends out a heartbeat request to the active controller.

The listeners, features, and rack fields only need to be set  when the broker is in INITIAL state, and is requesting a new broker epoch.

Code Block
languagejs
{
  "apiKey": 50,
  "type": "request",
  "name": "BrokerHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "CurrentState", "type": "int8", "versions": "0+",
      "about": "The current state that the broker is in." },
    { "name": "TargetState", "type": "int8", "versions": "0+",
      "about": "The state that the broker wants to reach." },
    { "name": "BrokerId", "type": "int32", "versions": "0+",
    	  "about": "The broker IDid." },
     },
	{ "name": "BrokerEpochIncarnationId", "type": "int64uuid", "versions": "0+", "default": "-1",
    	  "about": "The brokerincarnation epoch,id orof -1the if one has not yet been assignedbroker process." },
    	{ "name": "CurMetadataOffsetBrokerEpoch", "type": "int64", "versions": "0+",
	      "about": "The highestbroker metadataepoch offsetassigned whichby the broker has reachedcontroller." },
    	{ "name": "ListenersEndPoints", "type": "[]ListenerBrokerEndpoint", "versions": "0+", "nullableVersions": "0+",
	      "about": "The listeners of endpoints that can be used to communicate with this broker", "versions": "0+.", "fields": [
        		{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        		  "about": "The name of the endpoint." },
        		{ "name": "Host", "type": "string", "versions": "0+",
        		  "about": "The hostname." },
        		{ "name": "Port", "type": "int16uint16", "versions": "0+",
        		  "about": "The port." },
        		{ "name": "SecurityProtocol", "type": "int16", "versions": "0+",
        		  "about": "The security protocol." }
      ]
    	]},
	{ "name": "Features", "type": "[]FeatureBrokerFeature", "versions": "0+", "nullableVersions": "0+",
	      "about": "The features onthat this broker", "versions": "0+ supports.", "fields": [
        		{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        		  "about": "The name of the feature name." }
        ,
		{ "name": "MinSupportedVersionMinVersion", "type": "int16", "versions": "0+",
 		         "about": "The minimum supported feature level that this broker supports." },
        		{ "name": "MaxSupportedVersionMaxVersion", "type": "int16", "versions": "0+",
        		  "about": "The maximum supported feature level that this broker supports." }
      ]
    	]},
    	{ "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
    	  "about": "The rack which this broker is inrack." }
  ]
}

UnregisterBrokerRecord

Code Block
{
  "apiKey": 501,
  "type": "responsemetadata",
  "name": "BrokerHeartbeatResponseUnregisterBrokerRecord",
  "validVersions": "0",
  "fields": [
	{ "name": "flexibleVersions"BrokerId", "type": "int32", "versions": "0+",
	  "fieldsabout": [
"The broker   id." },
	{ "name": "ThrottleTimeMsBrokerEpoch", "type": "int32int64", "versions": "0+",
    	  "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did notThe broker epoch." }
  ]
}

TopicRecord

Code Block
{
  "apiKey": 2,
  "type": "metadata",
  "name": "TopicRecord",
  "validVersions": "0",
  "fields": [
         violate any quota." },
    { "name": "ErrorCodeTopicName", "type": "int16string", "versions": "0+",
          "about": "The error code, or 0 if there was no errortopic name." },
        { "name": "ActiveControllerIdTopicId", "type": "int32uuid", "versions": "0+",
          "about": "The unique ID of thethis topic." }
  ]
}

PartitionRecord

Code Block
{
  "apiKey": 3,
  "typeactive controller, or -1 if the controller doesn't know." },
    { "name": "NextStatemetadata",,
  "typename": "int8PartitionRecord",
  "versionsvalidVersions": "0+",
      "aboutfields": "The state to which the broker should transition." },[
    { "name": "BrokerEpochPartitionId", "type": "int64int32", "versions": "0+", "default": "-1",
      "about": "The broker's assigned epoch, or -1 if none was assignedpartition id." },
    { "name": "LeaseDurationMsTopicId", "type": "int64uuid", "versions": "0+",
      "about": "If BrokerEpoch is not -1, the number of milliseconds that we want the lease to last." }
  ]
}

enum BrokerState {
    UNKNOWN(0),
    INITIAL(1),
    FENCED(2),
    ACTIVE(3),
    SHUTDOWN(4);
}

The controller will wait to unfence a broker until it has sent at least one heartbeat where that broker's currentState is active.  So a typical transition will look like this:

  • broker sends a BrokerHeartbeatRequest with currentState = fenced, targetState = active.  controller response with nextState = active
  • broker sends a BrokerHeartbeatRequest with currentState = active, targetState = active.  controller response with nextState = active
  • controller unfences the broker

The second step informs the controller that the broker has received its response to the first step.  In order to speed this process along, the broker will send the second heartbeat immediately after receiving the first, rather than waiting for the heartbeat timeout.

As always with enums, the UNKNOWN state is used only to translate values that our software is too old to understand.

The controller will return NOT_CONTROLLER if it is not active.  Brokers will always return NOT_CONTROLLER for these RPCs.

Heartbeat requests that give a CurMetadataOffset which is lower than the previous one recorded for that broker will be ignored.

Record Formats

BrokerRecord

Code Block
{
  "apiKey": 0,
  "type": "metadata",
  "name": "BrokerRecord",
  "validVersions": "0",
  "fields": [
	The unique ID of this topic." },
    { "name": "Replicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas of this partition, sorted by preferred order." },
    { "name": "Isr", "type":  "[]int32", "versions":  "0+",
      "about": "The in-sync replicas of this partition" },
    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas that we are in the process of removing." },
    { "name": "AddingReplicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas that we are in the process of adding." },
    { "name": "IdLeader", "type": "int32", "versions": "0+", "default": "-1",
	      "about": "The broker id lead replica, or -1 if there is no leader." },
	    { "name": "Epoch"LeaderEpoch", "type": "int32", "typeversions": "int640+", "versionsdefault": "0+-1",
	      "about": "The broker epochAn epoch that gets incremented each time we change the leader." },
	{ "name": "EndPoints",
  ]
}

ConfigRecord

Code Block
{         
  "apiKey": 4,
  "type": "[]BrokerEndpointmetadata",
  "versionsname": "0+ConfigRecord",
  "nullableVersionsvalidVersions": "0+",
	  "aboutfields": "The[
 endpoints that can be used to communicate with this broker.", "fields": [
		{ "name": "NameResourceType", "type": "stringint8", "versions": "0+", "mapKey": true,
		
      "about": "The nametype of the endpoint resource this configuration applies to." },
		    { "name": "HostResourceName", "type": "string", "versions": "0+",
		      "about": "The hostname name of the resource this configuration applies to." },
		         
    { "name": "PortName", "type": "int16string", "versions": "0+",
		      "about": "The port name of the configuration key." },
		                  
    { "name": "SecurityProtocolValue", "type": "int16string", "versions": "0+",
		     
      "about": "The value of securitythe protocolconfiguration." }
  ]           
} 

PartitionChangeRecord

Code Block
{
  "apiKey": 5,
 	]},
	{ "name": "Features", "type": "[]BrokerFeaturemetadata",
  "versionsname": "0+PartitionChangeRecord",
  "nullableVersionsvalidVersions": "0+",
	  "aboutfields": "The features that this broker supports.", "fields": [
		[
    { "name": "NamePartitionId", "type": "stringint32", "versions": "0+", "mapKeydefault": true"-1",
		      "about": "The name of the featurepartition id." },
		    { "name": "MinVersionTopicId", "type": "int16uuid", "versions": "0+",
		      "about": "The minimumunique featureID level thatof this broker supportstopic." },
		    { "name": "MaxVersionIsr", "type":  "int16[]int32", "versionsdefault": "0+null",
		  "aboutentityType": "brokerId"The,
 maximum feature level that this broker supports." }
	]},
	{ "name"versions": "Rack0+", "typenullableVersions": "string0+", "versionstaggedVersions": "0+", "nullableVersionstag": "0+",
	      "about": "The broker rack." }
  ]
}

TopicRecord

Code Block
{
  "apiKey": 1,
  "type": "metadata",
  "name": "TopicRecord",
  "validVersions": "0",
  "fields": [null if the ISR didn't change; the new in-sync replicas otherwise." },
    { "name": "Leader", "type": "int32", "default": "-2", "entityType": "brokerId",
        { "nameversions": "Name0+", "typetaggedVersions": "string0+", "versionstag": "0+"1,
          "about": "The topic name-1 if there is now no leader; -2 if the leader didn't change; the new leader otherwise." },
        { "name": "TopicIdReplicas", "type": "uuid", "versions": "0+",
          "about": "The unique ID of this topic." },
   []int32", "default": "null", "entityType": "brokerId",
     { "nameversions": "Deleting0+", "typenullableVersions": "boolean0+", "versionstaggedVersions": "0+",
    "tag": 2,
      "about": "Truenull if thisthe topicreplicas is indidn't change; the processnew ofreplicas being deletedotherwise." },
    ]
}

PartitionRecord

Code Block
{
  "apiKeyname": 2"RemovingReplicas",
  "type": "metadata[]int32",
  "namedefault": "PartitionRecordnull",
  "validVersionsentityType": "0brokerId",
  "fields": [
    { "nameversions": "PartitionId0+", "typenullableVersions": "int320+", "versionstaggedVersions": "0+", "defaulttag": "-1"3,
      "about": "The partition idnull if the removing replicas didn't change; the new removing replicas otherwise." },
    { "name": "TopicIdAddingReplicas", "type": "uuid[]int32", "versionsdefault": "0+null",
      "aboutentityType": "The unique ID of this topic." }brokerId",
     { "nameversions": "Replicas0+", "typenullableVersions":  "[]int320+", "versionstaggedVersions":  "0+", "tag": 4,
      "about": "Thenull if the adding replicas of this partition, sorted by preferred orderdidn't change; the new adding replicas otherwise." },
  ]
} 

AccessControlRecord

Code Block
{
 { "nameapiKey": "Isr"6,
  "type":  "[]int32metadata",
  "versionsname":  "0+AccessControlRecord",
      "aboutvalidVersions": "0"The,
 in-sync replicas of this partition" }, "fields": [
    { "name": "RemovingReplicasResourceType", "type":  "[]int32int8", "versions":  "0+",
      "about": "The replicas that we are in the process of removing.resource type" },
    { "name": "AddingReplicasResourceName", "type":  "[]int32string", "versions": "0+", "nullableVersions": "0+",
      "about": "The replicas that we are in the process of adding resource name, or null if this is for the default resource." },
    { "name": "LeaderPatternType", "type": "int32int8", "versions": "0+", "default": "-1",
      "about": "The pattern leadtype replica(literal, or -1 if there is no leader.prefixed, etc.)" },
    { "name": "LeaderEpochPrincipal", "type": "int32string", "versions": "0+", "default": "-1",
      "about": "AnThe epoch that gets incremented each time we change the leaderprincipal name." }
  ]
}

ConfigRecord

Code Block
{,
         
  "apiKey{ "name": 3"Host",
  "type": "metadatastring",
  "nameversions": "ConfigRecord0+",
   "validVersions": "0",
  "fieldsabout": [ "The host." },
    { "name": "ResourceTypeOperation", "type": "int8", "versions": "0+",
      "about": "The operation type of resource this configuration applies to." },
    { "name": "ResourceNamePermissionType", "type": "stringint8", "versions": "0+",
      "about": "The namepermission oftype the resource this configuration applies to(allow, deny)." },
  ]
} 

FenceBrokerRecord

Code Block
{
      
    { "name"apiKey": 7,
  "type": "Namemetadata",
  "typename": "stringFenceBrokerRecord",
  "versionsvalidVersions": "0+",
  "fields": [
    { "aboutname": "BrokerId"The name of the configuration key." },, "type": "int32", "versions": "0+",
      "about": "The broker ID to fence. It will be removed from all ISRs." }
    { "name": "ValueBrokerEpoch", "type": "stringint64", "versions": "0+",     
      "about": "The valueepoch of the broker to configurationfence." }
  ]           
} 

...

UnfenceBrokerRecord

Code Block
{
  "apiKey": 48,
  "type": "metadata",
  "name": "IsrChangeRecordUnfenceBrokerRecord",
  "validVersions": "0",
  "fields": [
    { "name": "0",
  "fields": [ "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID to unfence." }
    { "name": "PartitionIdBrokerEpoch", "type": "int32int64", "versions": "0+", "default": "-1",
      "about": "The partition idepoch of the broker to unfence." },
  ]
} 

RemoveTopic

Code Block
{
 { "nameapiKey": "TopicId"9,
  "type": "uuidmetadata",
  "versionsname": "0+RemoveTopicRecord",
      "aboutvalidVersions": "0"The,
 unique ID of this topic." }, "fields": [
    { "name": "IsrTopicId", "type":  "[]int32uuid", "versions":  "0+",
      "about": "The in-sync replicas of this partition topic to remove. All associated partitions will be removed as well." },
  ]
} 

DelegationTokenRecord

Code Block
{
 { "nameapiKey": "Leader"10,
  "type": "int32metadata",
  "versionsname": "0+DelegationTokenRecord",
  "defaultvalidVersions": "-10",
      "aboutfields": "The lead replica, or -1 if there is no leader." },[
    { "name": "LeaderEpochOwner", "type": "int32string", "versions": "0+", "default": "-1",
      "about": "AnThe epochdelegation that gets incremented each time we change the leadertoken owner." },
   ]
} 

AccessControlRecord

Code Block
{
  "apiKeyname": 5"Renewers",
  "type": "metadata[]string",
  "nameversions": "AccessControlRecord0+",
      "validVersionsabout": "0",
  "fields": [The principals which have renewed this token." },
    { "name": "ResourceTypeIssueTimestamp", "type": "int8int64", "versions": "0+",
      "about": "The resource type time at which this timestamp was issued." },
    { "name": "ResourceNameMaxTimestamp", "type": "stringint64", "versions": "0+", "nullableVersions": "0+",
      "about": "The resourcetime name,at orwhich nullthis iftoken thiscannot isbe forrenewed theany default resourcemore." },
    { "name": "PatternTypeExpirationTimestamp", "type": "int8int64", "versions": "0+",
      "about": "The pattern type (literal, prefixed, etc.) next time at which this token must be renewed." },
    { "name": "PrincipalTokenId", "type": "string", "versions": "0+",
      "about": "The principaltoken nameid." },
  ]
} 

UserScramCredentialRecord

Code Block
{
 { "nameapiKey": "Host"11,
  "type": "stringmetadata",
  "versionsname": "0+UserScramCredentialRecord",
      "aboutvalidVersions": "0"The host." },,
  "fields": [
    { "name": "OperationUserName", "type": "int8string", "versions": "0+",
      "about": "The operationuser typename." },
    { "name": "PermissionTypeCredentialInfos", "type": "int8[]CredentialInfo", "versions": "0+",
      "about": "The permissionmechanism typeand (allow, deny)." }
  ]
} 

FenceBroker

Code Block
{
  "apiKey": 6,
  "type": "metadata",
  "name": "FenceBrokerRecord",
  "validVersions": "0",
 related information associated with the user's SCRAM credential.", "fields": [
      { "name": "IdMechanism", "type": "int32int8", "versions": "0+",
        "about": "The broker ID to fence. It will be removed from all ISRs.SCRAM mechanism." },
      { "name": "EpochSalt", "type": "int64bytes", "versions": "0+",
        "about": "TheA epochrandom ofsalt thegenerated brokerby tothe fenceclient." },
     ]
} 

RemoveTopic

Code Block
{
  "apiKeyname": 7"SaltedPassword",
  "type": "metadatabytes",
  "nameversions": "RemoveTopicRecord0+",
        "validVersionsabout": "0",
The salted password."fields": [},
      { "name": "IdIterations", "type": "uuidint32", "versions": "0+",
        "about": "The topicnumber toof remove.iterations Allused associatedin partitionsthe will be removed as wellSCRAM credential." }]}
  ]
} 

...

FeatureLevelRecord

Code Block
{
  "apiKey": 812,
  "type": "metadata",
  "name": "DelegationTokenRecordFeatureLevelRecord",
  "validVersions": "0",
  "fields": [
    { "name": "OwnerName", "type": "string", "versions": "0+",
      "aboutmapKey": "The delegation token owner." }true,
    { "name": "Renewers", "type": "[]string", "versions": "0+",
      "about": "The principals which have renewed this tokenfeature name." },
    { "name": "IssueTimestampMinFeatureLevel", "type": "int64int16", "versions": "0+",
      "about": "The time at which this timestamp was issuedThe current finalized minimum feature level of this feature for the cluster." },
    { "name": "MaxTimestampMaxFeatureLevel", "type": "int64int16", "versions": "0+",
      "about": "The current timefinalized atmaximum whichfeature thislevel tokenof cannotthis befeature renewedfor anythe morecluster." },
  ]
} 

FailedReplicasRecord

Code Block
{
 { "nameapiKey": "ExpirationTimestamp"13,
  "type": "int64metadata",
  "versionsname": "0+FailedReplicasRecord",
      "aboutvalidVersions": "The next time at which this token must be renewed." },"0",
  "fields": [
    { "name": "TokenIdBrokerId", "type": "stringint32", "versions": "0+",
      "about": "The tokenbroker id." },
  ]
} 

ScramUser

Code Block
{
   { "apiKeyname": 9"Topics",
  "type": "metadata[]TopicWithFailures",
  "nameversions": "DelegationTokenRecord0+",
      "validVersionsabout": "0The topics with failed replicas.",
  "fields": [
      { "name": "NameTopicId", "type": "stringuuid", "versions": "0+",
        "about": "The usertopic nameUUID." },
      { "name": "CredentialInfosPartitions", "type": "[]CredentialInfoint32", "versions": "0+",
        "about": "The mechanismpartition ids." }    
    ]}
  ]
} 

QuotaRecord

Code Block
{
  "apiKey": 14,
 and related information associated with the user's SCRAM credential.", "fields": [
      { "name": "Mechanism", "type": "int8metadata",
  "versionsname": "0+QuotaRecord",
        "about": "The SCRAM mechanism." },
  validVersions": "0",
  "fields": [
    { "name": "SaltEntity", "type": "bytes[]EntityData", "versions": "0+",
        "about": "AThe randomquota saltentity generated by the client." },to alter.", "fields": [
      { "name": "SaltedPasswordEntityType", "type": "bytesstring", "versions": "0+",
        "about": "The saltedentity passwordtype." },
      { "name": "IterationsEntityName", "type": "int32string", "versions": "0+",
        "about": "The number of iterations used in the SCRAM credential." }]}
  ]
} 

FeatureLevel

Code Block
{
  "apiKey": 10,
  "typenullableVersions": "metadata0+",
        "nameabout": "FeatureLevelRecord",
  "validVersions": "0",
  "fields": [The name of the entity, or null if the default." }
    ]},
    { "name": "NameKey", "type": "string", "versions": "0+", "mapKey": true,
      "about": "The featurequota configuration namekey." },
    { "name": "MinFeatureLevelValue", "type": "int16float64", "versions": "0+",
      "about": "The value currentto finalizedset, minimumotherwise featureignored levelif ofthe thisvalue featureis forto thebe clusterremoved." },
    { "name": "MaxFeatureLevelRemove", "type": "int16bool", "versions": "0+",
      "about": "TheWhether currentthe finalizedquota maximumconfiguration featurevalue levelshould ofbe thisremoved, feature for the clusterotherwise set." }
  ]
} 

New Metrics

Full NameDescription

kafka.controller:type=KafkaController,name=MetadataLag

The offset delta between the latest metadata record this controller has replayed and the last stable offset of the metadata topic.

kafka.controller:type=KafkaServer,name=MetadataLagThe offset delta between the latest metadata record this broker has replayed and the last stable offset of the metadata topic.
kafka.controller:type=KafkaController,name=MetadataCommitLatencyMsThe latency of committing a message to the metadata topic.  Relevant on the active controller.
kafka.controller:type=KafkaController,name=MetadataCommitRateMetadataCommitRatePerSecThe number of metadata messages per second committed to the metadata topic.
kafka.controller:type=KafkaController,name=MetadataSnapshotLagMetadataSnapshotOffsetLag

New name for kafka.controller:type=KafkaController,name=SnapshotLag

The offset delta between the latest stable offset of the metadata topic and the offset of the last snapshot (or the last stable offset itself, if there are no snapshots)

...

Since KIP-500 mode is currently in a pre-alpha state, we do not guarantee that future versions will support upgrading from the current version of it yet.  Once it is more stable, we will have a more traditional binary compatibility regime.

Rejected Alternatives

...

Suport Automatic Broker ID Assignment

This KIP proposes to drop support for automatic broker ID assignment.  What if we decided to continue to support it?

...