You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 93 Next »

Status

Current state: under discussion

Discussion threadhere

JIRA:

Released: TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Apache Kafka is in the process of moving from storing metadata in Apache Zookeeper, to storing metadata in an internal Raft topic.  KIP-500 described the overall architecture and plan.  The purpose of this KIP is to go into detail about how the Kafka Controller will change during this transition.

Proposed Changes

Deployment

KIP-500 Mode

Once this KIP is implemented, system administrators will have the option of running in KIP-500 mode.  In this mode, we will not use ZooKeeper  The alternative mode where KIP-500 support is not enabled will be referred to as legacy mode.

KIP-500 mode must be enabled for the entire cluster, not just for specific nodes.  Initially, this mode will be considered experimental and not ready for production.  As we do more testing and gain more confidence, we will remove the experimental label.  Eventually, in a future release, KIP-500 mode will be the only supported mode.  Since dropping support for legacy mode is an incompatible change, it will need to happen in a major release, of course.

Initially, we will not support upgrading a cluster from legacy mode to KIP-500 mode.  This is in keeping with the experimental nature of KIP-500 mode.  A future KIP will describe and implement an upgrade process from legacy mode to KIP-500 mode.

Before being used in KIP-500 mode, the storage directories on a node must be formatted.  This requirement prevents system administrators from accidentally enabling KIP-500 mode by simply making a configuration change.  Requiring formatting also prevents mistakes, since Kafka no longer has to guess if an empty storage directory is a newly directory or one where a system error prevented any data from showing up.

Nodes

Currently, a ZooKeeper cluster must be deployed when running Kafka.  This KIP will eliminate that requirement, as well as the requirement to configure the addresses of the zookeeper nodes on each broker.

Currently, any broker node can be elected as the controller.  As part of this KIP, the active controller will instead be selected among a small pool of nodes specifically configured to act as controllers.  Typically three or five nodes in the cluster will be selected to be controllers.

System administrators will be able to choose whether to run separate controller nodes, or whether to run controller nodes which are co-located with broker nodes.  Kafka will provide support for running a controller in the same JVM as a broker, in order to save memory and enable single-process test deployments.

The addresses and ports of the controller nodes must be configured on each broker, so that the broker can contact the controller quorum when starting up.  This is similar to how we configure the ZooKeeper quorum on each node today.

Note that as long as at least one of the provided controller addresses is valid, the broker will be able to learn about the current metadata quorum and start up.  Once the broker is in contact with the metadata quorum, the quorum bootstrap addresses will not be needed.  This makes it possible to reconfigure the metadata quorum over time.  For example, if we start with a metadata quorum of host1, host2, host3, we could replace host3 with host4 without disrupting any of the brokers.  Then we could roll the brokers to apply the new metadata quorum bootstrap configuration of host1, host2, host4 on each one.

Node IDs

Just like brokers, controller nodes will have non-negative integer node IDs.  There will be a single ID space.  In other words, no controller should share the same ID as a broker.  Even when a broker and a controller are co-located in the same JVM, they must have different node IDs.

Automatic node ID assignment via ZooKeeper will no longer be supported in KIP-500 mode.  Node IDs must be set in the configuration file for brokers and controllers.

Networking

Controller processes will listen on a separate endpoint from brokers.  This will be true even when the broker and controller are co-located in the same JVM.

In a well-run Kafka deployment, controller ports, like ZooKeeper ports, should be firewalled off from clients.  This will prevent clients from disrupting the cluster by flooding the controller ports with requests.  In the realm of ACLs, this translates to controllers requiring CLUSTERACTION on CLUSTER for all operations.

The only time when clients should contact a controller node directly is when they are debugging system issues.  This is similar to ZooKeeper, where we have things like zk-shell, but only for debugging.

Metadata

The Metadata Topic

As described in KIP-500, the controller will store its data in the internal @metadata topic.  This topic will contain a single partition which is managed by Raft, as described in KIP-595: A Raft Protocol for the Metadata Quorum.

The leader of the controller quorum will be the active controller.  The followers will function as hot standbys, ready to take over when the active leader fails or resigns.  The metadata will be stored in memory on all of the controllers.

Persistence and Visibility

Metadata changes need to be persisted to the @metadata log before we apply them on the other nodes in the cluster.  This means waiting for the metadata log's last stable offset to advance to the offset of the change.  After that point, we are guaranteed not to lose the change as long as we uphold the Raft invariants.

Changes that we haven't yet persisted are referred to as "uncommitted."  The active controller may have several of these uncommitted changes in flight at any given time.  In essence, the controller's in-memory state is always a little bit in the future compared to the current state.  This allows the active controller to continue doing things while it waits for the previous changes to be committed to the Raft log.

However, this "future state" may never be committed.  For example, the active controller might fail, truncating some of its future state.  Therefore, the active controller must not make this future state "visible" to the rest of the cluster until it has been made persistent – that is, until it becomes current state.  In the case of the @metadata topic, the replication protocol itself neatly takes care of this for us.  In the case of controller RPCs like AlterIsr, the controller handles this by not sending back a response until the designated change has been persisted.

                      earlier offsets
                          +---+
     visible change ----> |   |
                          +---+       last stable offset
     visible change ----> |   | <---- standby controllers'
                          +---+       in-memory state
     pending change ----> |   |
                          +---+
     pending change ----> |   |
                          +---+
     pending change ----> |   |
                          +---+
     pending change ----> |   | <---- active controller's
                          +---+       in-memory state
                      latest offset

Record Formats

The active controller makes changes to the metadata by appending records to the log. Each record has a null key, and this format for its value:

  1. an unsigned varint specifying the record type.
  2. an unsigned varint specifying the record version
  3. the payload in Kafka RPC format

For example, if we wanted to encode a TopicRecord, we might have 1 encoded as a varint, followed by 0 as the version, followed by the serialized topic data.

The record type and version will typically only take one byte each, for a total header size of two bytes.

Record Format Versions

There are two ways to evolve the format of a KIP-500 record.  One is to add KIP-482 optional tagged fields.  These will be ignored by older software, but can contain additional data for new software to handle.  The other choice is to bump the version of the record.

In the pre-KIP-500 world, we had the inter-broker protocol (IBP) setting to control what RPC versions the controller used to communicate with the brokers.  This allowed us to evolve the inter-broker RPC format over time.  We also used it to gate many other features, such as metadata format changes.  In the post-KIP-500 world, the analogous setting is the metadata.format KIP-584 feature flag. This setting controls the snapshot and delta formats which the controller will use.

Snapshot Implementation

As time goes on, the number of records will grow and grow, even if the total size of the metadata stays constant.  Therefore, periodically, we need to consolidate all the metadata deltas into a snapshot.

Like the metadata log, the snapshot is made up of records.  However, unlike the log, in which there may be multiple records describing a single entity, the snapshot will only contain the minimum number of records needed to describe all the entities.

Snapshots are local to each replica.  For example, replica A may have a snapshot at offset 100, and deltas up to offset 150, whereas replica B may have a snapshot at 125 and deltas up to offset 150.  Any snapshot must be usable as a starting point for loading the entire state of metadata.  In other words, a new controller node must be able to load the a snapshot, and then apply all the edits which follow it, and come up-to-date.

The currently active controller will monitor the offset of the latest snapshot made by all replicas, including itself.  The snapshotting state of each node is considered soft state: it is not persisted anywhere in the log, but purely communicated by heartbeats and stored in memory by the active controller.

Broker Registration and State Management

The Three Cluster Membership States

Currently, from the perspective of ZooKeeper, there are two states brokers can be in: registered, and not registered.  When brokers are registered, other brokers can find their network endpoints in order to communicate with them.  They are also part of the MetadataResponse communicated back to clients.  When they are not registered, neither of those are true.

In the post-KIP-500 world, there will be three cluster membership states: unregistered, registered but fenced, and registered and active.  Just like today, unregistered means that there is no registration information and no way to reach the broker.  It is effectively not part of the cluster.  In the two registered states, in contrast, contact information is available.  However, in the "registered but fenced" state, the contact information might no longer be valid.  For example, if a broker crashes and is not restarted, it will end up in "registered but fenced" state.

A broker only appears in MetadataResponse if it is in the "registered and active" state.  If it is in the unreigstered state, or the "registered and fenced" state, it will not appear in MetadataResponse.

Broker Registration

Every distributed system needs a way of managing cluster membership.  Prior to KIP-500, Kafka brokers registered ephemeral znodes in order to register themselves as part of the cluster.  The Kafka controller passively consumed the registration information from Zookeeper.

In the post-KIP-500 world there is no ZooKeeper and no ephemeral znodes.  Instead, each broker registers itself with the active controller using a BrokerRegistrationRequest. The active controller assigns the broker a new broker epoch, based on the next available offset in the log.  The new epoch is guaranteed to be higher than any previous epoch that has been used for the given broker id.

Each registration request contains a UUID which identifies the process which sent it.  This ID is called the incarnation ID.  This ensures that if the response to the registration request is lost, the broker can simply re-send the registration RPC and get the same successful result as before.

Registration requests also have information about the feature flags which the broker software supports.  The controller will refuse to register brokers if they don't support the feature flags which are active in the cluster.  In this case, the sysadmin needs to upgrade the broker software before it can be added to the cluster.

Handling Broker ID Conflicts

The controller only allows one broker process to be registered per broker ID.  Of course, broker processes go away occasionally-- for example, if a broker crashes.  A broker ID can be reused once a certain amount of time has gone past without any contact with the previous incarnation of the broker.

For the purpose of this section, handling a registration request or a broker heartbeat request are both considered forms of contact (even if the broker is fenced).

Broker Leases

Once a broker is registered, its next goal is to transition from registered+fenced to registered+active.  It can do this by sending a broker heartbeat request to the active controller.  When it receives a the heartbeat request, the active controller must decide whether to give the broker a lease, or whether it should deny the lease.  Brokers that don't have a broker ID lease are said to be "fenced."

As mentioned earlier, brokers which are fenced will not appear in MetadataResponses.  So clients that have up-to-date metadata will not try to contact fenced brokers.

Broker leases are time-bounded.  The amount of time they last is specified by the controller in its response.  Once the period has elapsed, if the broker has not renewed its lease via a heartbeat, it will be fenced.

Controlled Shutdown

In the pre-KIP-500 world, brokers triggered a controller shutdown by making an RPC to the controller.  When the controller returned a successful result from this RPC, the broker knew that it could shut down.

In the post-KIP-500 world, controller shutdown is handled by the broker heartbeat system instead.  In its periodic heartbeats, the broker asks the controller if it can transition into the controlled shutdown state.  This motivates the controller to move all of the leaders off of that broker.  Once they are all moved, the controller responds to the heartbeat with a nextState of SHUTDOWN.

The Broker State Machine

NOT_RUNNING

This is the state that the broker is in before it has started up.  When the broker starts up, it transitions to STARTING.

STARTING

While the broker is in this state, it is trying to catch up with the latest metadata.  It fetches the metadata from the controller quorum. Once it has finished catching up, it transitions to the RECOVERY state.

RECOVERY

The broker is in this state while it is starting the log manager.  If the shutdown was clean, the broker will leave this state very quickly.  If the shutdown was unclean, the broker will stay in this state until log recovery is complete.

Once log recovery is done, the broker will start listening on the socket server.  It will then ask the controller to unfence it.  Once the controller agrees, it will transition to the RUNNING state.

RUNNING

The broker is in this state when it's up and running.

PENDING_CONTROLLED_SHUTDOWN

The broker is in this state when it has received a SIGTERM and is trying to shut down.

SHUTTING_DOWN

The broker is in this state when controlled shutdown has finished and it is shutting down.

Changes in the Broker State Machine

The numeric constants exposed through the metrics API have not changed, and there are no new or removed states.

The main change in the broker state machine is that the RECOVERING_FROM_UNCLEAN_SHUTDOWN state has been renamed to RECOVERY.  Also, unlike previously, the broker will always pass through RECOVERY (although it may only stay in this state for a very short amount of time).

Public Interfaces

meta.properties

When a storage directory is in use by a cluster running in kip-500 mode, it will have a new version of the meta.properties file.  Since the current version is 0, the new version will be 1.   Just as in version 0, meta.properties will continue to be a Java Properties file.  This essentially means that it is a plain text file where each line has the format key=value.

In version 0 of meta.properties, the cluster.id field is optional.  In contrast, in version 1 it is mandatory.

In version 0 of meta.properties, the cluster.id field is serialized in hexadecimal.  In contrast, in version 1 it is serialized in base64.

Version 1 of meta.properties adds the controller.id field.  Nodes which are acting as controllers will have this field.  If the node is also a broker, it will have the broker.id field as well.

During broker startup, if each directory does not have a broker.id, one will be added with the current broker id.  Similarly, during controller startup, if each directory does not have a controller id, one will be added with the current controller id.

If the broker id and/or controller id is present but does not match the configured id, we will throw an exception and abort the startup process.

Here is an example of a version 1 meta.properties file:

$ cat /tmp/kafka-logs/meta.properties
#
#Tue Dec 01 10:08:08 PST 2020
cluster.id=3Db5QLSqSZieL3rJBUUegA
version=1
broker.id=0

Command-Line Tools

There will be a new command-line tool, kafka-storage.sh.

$ ./bin/kafka-storage.sh -h
usage: kafka-storage [-h] {info,format,random-uuid} ...

The Kafka storage tool.

positional arguments:
  {info,format,random-uuid}
    info                 Get information about the Kafka log directories on this node.
    format               Format the Kafka log directories on this node.
    random-uuid          Print a random UUID.

optional arguments:
  -h, --help             show this help message and exit

kafka-storage.sh will have three subcommands: info, format, and random-uuid.

info

$ ./bin/kafka-storage.sh info -h
usage: kafka-storage info [-h] --config CONFIG

optional arguments:
  -h, --help             show this help message and exit
  --config CONFIG, -c CONFIG
                         The Kafka configuration file to use.

The info command will give information about the configured storage directories.  Example output:

Found log directory:
  /tmp/kafka-logs

Found metadata: MetaProperties(clusterId=51380268-1036-410d-a8fc-fb3b55f48033)

format

$ ./bin/kafka-storage.sh format -h
usage: kafka-storage format [-h] --config CONFIG --cluster-id CLUSTER_ID [--ignore-formatted]

optional arguments:
  -h, --help             show this help message and exit
  --config CONFIG, -c CONFIG
                         The Kafka configuration file to use.
  --cluster-id CLUSTER_ID, -t CLUSTER_ID
                         The cluster ID to use.
  --ignore-formatted, -g

When running kip-500 mode, the storage directories must be formatted using this command prior to starting up the brokers and controllers.

If any of the storage directories are formatted, the command will normally fail.  This behavior can be changed by passing the --ignore-formatted option.  When this option is passed, the format command will skip over already formatted directories rather than failing.

random-uuid

$ ./bin/kafka-storage.sh random-uuid -h
usage: kafka-storage random-uuid [-h]

optional arguments:
  -h, --help             show this help message and exit

The random-uuid command prints out a random UUID to stdout.

$ ./bin/kafka-storage.sh random-uuid
51380268-1036-410d-a8fc-fb3b55f48033

Configurations

Configuration NamePossible ValuesNotes
process.roles

null

broker

controller

broker,controller

If this is null (absent) then we are in legacy mode.

Otherwise, we are in KIP-500 mode and this configuration determines what roles this process should play: broker, controller, or both.

controller.listener.names

If non-null, this must be a comma-separated list of listener names.

When communicating with the controller quorum, the broker will always use the first listener in this list.

A comma-separated list of the names of the listeners used by the KIP-500 controller. This configuration is required if this process is a KIP-500 controller. The legacy controller will not use this configuration

Despite the similar name, note that this is different from the "control plane listener" introduced by KIP-291.

listeners

A comma-separated list of the configured listeners.  For example,

INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094

This configuration is now required.
controller.connect

If non-null, this must be a comma-separated list of all the controller voters, in the format:

{controller-id}@{controller-host):{controller-port}

When in KIP-500 mode, each node must have this configuration, in order to find out how to communicate with the controller quorum.

Note that this replaces the "quorum.voters" config described in KIP-595.

This configuration is required for both brokers and controllers.

controller.id

a 32-bit ID

The controller id for this server.  Only required if this server is a controller.

broker.ida 32-bit IDThe broker id for this server.  Only required if this server is a broker.
initial.broker.registration.timeout.ms90000When initially registering with the controller quorum, the number of milliseconds to wait before declaring failure and exiting the broker process.
broker.heartbeat.interval.ms3000The length of time between broker heartbeats.
broker.registration.timeout.ms18000The length of time that a broker lease lasts if no heartbeats are made.
metadata.log.dirIf set, this must be a path to a log directory.This configuration determines where we put the metadata log.  if it is not set, the metadata log is placed in the first log directory from log.dirs.
controller.quorum.fetch.timeout.msMaximum time without a successful fetch from the current leader before a new election is started.New name for quorum.fetch.timeout.ms

controller.quorum.election.timeout.ms

Maximum time without collected a majority of votes during the candidate state before a new election is retriedNew name for quorum.election.timeout.ms
controller.quorum.election.backoff.max.msMaximum exponential backoff time (based on the number if retries) after an election timeout, before a new election is triggered.New name for quorum.election.backoff.max.ms
controller.quorum.request.timeout.msMaximum time before a pending request is considered failed and the connection is droppedNew name for quorum.request.timeout.ms
controller.quorum.retry.backoff.msInitial delay between request retries. This config and the one below is used for retriable request errors or lost connectivity and are different from the election.backoff configs aboveNew name for quorum.retry.backoff.ms
controller.quorum.retry.backoff.max.msMax delay between requests. Backoff will increase exponentially beginning from quorum.retry.backoff.msNew name for quorum.retry.backoff.max.ms

Deprecated Configurations

Configuration NameReason
control.plane.listener.nameWe no longer need to maintain a separate listener for messages from the controller, since the controller does not send messages out any more (it receives them).
broker.id.generation.enableAutomatic broker ID generation is no longer supported.
zookeeper.*We no longer need configurations for ZooKeeper.

New Error Codes

DUPLICATE_BROKER_REGISTRATION

There will be a new error code, DUPLICATE_BROKER_REGISTRATION, that the active controller will return when a broker tries to register with an ID that is currently in use.

INVALID_CLUSTER_ID

There will be a new error code, INVALID_CLUSTER_ID, that the controller will return if the broker tries to register with the wrong cluster ID.

RPCs

Obsoleting the Metadata Propagation RPCs

As discussed earlier, the new controller will use FetchRequest to fetch metadata from the active controller.  The details of how Raft fetching will work are spelled out in KIP-595: A Raft Protocol for the Metadata Quorum.

Since we propagate the metadata via Raft, we will no longer need to send out LeaderAndIsrRequest, UpdateMetadataRequest, and StopReplicaRequest.  These requests will be sent out only when we're in legacy mode, not when we're in KIP-500 mode.  Eventually we will add some support for these requests to the new controller, in order to support rolling upgrade from a pre-KIP-500 release. However, for the purpose of this KIP, the new controller will not use these requests.

Obsoleting the Controlled Shutdown RPC

The broker heartbeat mechanism replaces the controlled shutdown RPC.  Therefore, we will not need to support the this RPC any more in the controller-- except for compatibility during upgrades, which will be described further in a follow-on KIP.

BrokerRegistration

{
  "apiKey": 57,
  "type": "request",
  "name": "BrokerRegistrationRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID." },
	{ "name": "ClusterId", "type": "uuid", "versions": "0+",
	  "about": "The cluster id of the broker process." },
	{ "name": "IncarnationId", "type": "uuid", "versions": "0+",
	  "about": "The incarnation id of the broker process." },
    { "name": "CurMetadataOffset", "type": "int64", "versions": "0+",
      "about": "The highest metadata offset which the broker has reached." },
    { "name": "Listeners", "type": "[]Listener",
      "about": "The listeners of this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname." },
        { "name": "Port", "type": "int16", "versions": "0+",
          "about": "The port." },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol." }
      ]
    { "name": "Features", "type": "[]Feature",
      "about": "The features on this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The feature name." }
        { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The minimum supported feature level." },
        { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The maximum supported feature level." }
      ]
    },
    { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The rack which this broker is in." }
  ]
}

{
  "apiKey": 57,
  "type": "response",
  "name": "BrokerRegistrationResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker's assigned epoch, or -1 if none was assigned." }
  ]
}

BrokerHeartbeat

As described earlier, the broker periodically sends out a heartbeat request to the active controller.

{
  "apiKey": 58,
  "type": "request",
  "name": "BrokerHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker epoch." },
    { "name": "CurrentMetadataOffset", "type": "int64", "versions": "0+",
      "about": "One more than the highest metadata offset which the broker has reached." },
    { "name": "ShouldFence", "type": "bool", "versions": "0+",
      "about": "True if the broker wants to be fenced, false otherwise." }
    { "name": "ShouldShutDown", "type": "bool", "versions": "0+",
      "about": "True if the broker wants to initiate controlled shutdown." }
  ]
}   

{
  "apiKey": 58,
  "type": "response",
  "name": "BrokerHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "IsCaughtUp", "type": "bool", "versions": "0+",
      "about": "True if the broker has approximately caught up with the latest metadata." },
    { "name": "IsFenced", "type": "bool", "versions": "0+",
      "about": "True if the broker is fenced." },
    { "name": "ControlledShutdownOk", "type": "bool", "versions": "0+",
      "about": "True if the broker can execute a controlled shutdown now." }
  ]
}

The controller will wait to unfence a broker until it sends a heartbeat where ShouldFence is false and CurrentMetadataOffset is caught up.

If the heartbeat request has ShouldShutDown set, the controller will try to move all the leaders off of the broker.

The controller will set ControlledShutdownOk if the broker is cleared to execute a controlled shutdown.  In other words, if it has no leaderships.

The controller will return NOT_CONTROLLER if it is not active.  Brokers will always return NOT_CONTROLLER for these RPCs.

Record Formats

RegisterBrokerRecord

{
  "apiKey": 0,
  "type": "metadata",
  "name": "RegisterBrokerRecord"
,
  "validVersions": "0",
  "fields": [
	{ "name": "Id", "type": "int32", "versions": "0+",
	  "about": "The broker id." },
	{ "name": "IncarnationId", "type": "uuid", "versions": "0+",
	  "about": "The incarnation id of the broker process." },
	{ "name": "BrokerEpoch", "type": "int64", "versions": "0+",
	  "about": "The broker epoch assigned by the controller." },
	{ "name": "EndPoints", "type": "[]BrokerEndpoint", "versions": "0+", "nullableVersions": "0+",
	  "about": "The endpoints that can be used to communicate with this broker.", "fields": [
		{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
		  "about": "The name of the endpoint." },
		{ "name": "Host", "type": "string", "versions": "0+",
		  "about": "The hostname." },
		{ "name": "Port", "type": "int16", "versions": "0+",
		  "about": "The port." },
		{ "name": "SecurityProtocol", "type": "int16", "versions": "0+",
		  "about": "The security protocol." }
	]},
	{ "name": "Features", "type": "[]BrokerFeature", "versions": "0+", "nullableVersions": "0+",
	  "about": "The features that this broker supports.", "fields": [
		{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
		  "about": "The name of the feature." },
		{ "name": "MinVersion", "type": "int16", "versions": "0+",
		  "about": "The minimum feature level that this broker supports." },
		{ "name": "MaxVersion", "type": "int16", "versions": "0+",
		  "about": "The maximum feature level that this broker supports." }
	]},
	{ "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
	  "about": "The broker rack." }
  ]
}

UnregisterBrokerRecord

{
  "apiKey": 1,
  "type": "metadata",
  "name": "UnregisterBrokerRecord",
  "validVersions": "0",
  "fields": [
	{ "name": "Id", "type": "int32", "versions": "0+",
	  "about": "The broker id." },
	{ "name": "Epoch", "type": "int64", "versions": "0+",
	  "about": "The broker epoch." }
  ]
}

TopicRecord

{
  "apiKey": 2,
  "type": "metadata",
  "name": "TopicRecord",
  "validVersions": "0",
  "fields": [
        { "name": "Name", "type": "string", "versions": "0+",
          "about": "The topic name." },
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The unique ID of this topic." },
        { "name": "Deleting", "type": "boolean", "versions": "0+",
          "about": "True if this topic is in the process of being deleted." }
  ]
}

PartitionRecord

{
  "apiKey": 3,
  "type": "metadata",
  "name": "PartitionRecord",
  "validVersions": "0",
  "fields": [
    { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The partition id." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique ID of this topic." },
    { "name": "Replicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas of this partition, sorted by preferred order." },
    { "name": "Isr", "type":  "[]int32", "versions":  "0+",
      "about": "The in-sync replicas of this partition" },
    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas that we are in the process of removing." },
    { "name": "AddingReplicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas that we are in the process of adding." },
    { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The lead replica, or -1 if there is no leader." },
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
      "about": "An epoch that gets incremented each time we change the leader." }
  ]
}

ConfigRecord

{         
  "apiKey": 4,
  "type": "metadata",
  "name": "ConfigRecord",
  "validVersions": "0",
  "fields": [
    { "name": "ResourceType", "type": "int8", "versions": "0+",
      "about": "The type of resource this configuration applies to." },
    { "name": "ResourceName", "type": "string", "versions": "0+",
      "about": "The name of the resource this configuration applies to." },         
    { "name": "Name", "type": "string", "versions": "0+",
      "about": "The name of the configuration key." },                  
    { "name": "Value", "type": "string", "versions": "0+",     
      "about": "The value of the configuration." }
  ]           
} 

IsrChangeRecord

{
  "apiKey": 5,
  "type": "metadata",
  "name": "IsrChangeRecord",
  "validVersions": "0",
  "fields": [
    { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The partition id." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique ID of this topic." },
    { "name": "Isr", "type":  "[]int32", "versions":  "0+",
      "about": "The in-sync replicas of this partition" },
    { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The lead replica, or -1 if there is no leader." },
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
      "about": "An epoch that gets incremented each time we change the leader." }
  ]
} 

AccessControlRecord

{
  "apiKey": 6,
  "type": "metadata",
  "name": "AccessControlRecord",
  "validVersions": "0",
  "fields": [
    { "name": "ResourceType", "type": "int8", "versions": "0+",
      "about": "The resource type" },
    { "name": "ResourceName", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The resource name, or null if this is for the default resource." },
    { "name": "PatternType", "type": "int8", "versions": "0+",
      "about": "The pattern type (literal, prefixed, etc.)" },
    { "name": "Principal", "type": "string", "versions": "0+",
      "about": "The principal name." },
    { "name": "Host", "type": "string", "versions": "0+",
      "about": "The host." },
    { "name": "Operation", "type": "int8", "versions": "0+",
      "about": "The operation type." },
    { "name": "PermissionType", "type": "int8", "versions": "0+",
      "about": "The permission type (allow, deny)." }
  ]
} 

FenceBrokerRecord

{
  "apiKey": 7,
  "type": "metadata",
  "name": "FenceBrokerRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Id", "type": "int32", "versions": "0+",
      "about": "The broker ID to fence. It will be removed from all ISRs." }
    { "name": "Epoch", "type": "int64", "versions": "0+",
      "about": "The epoch of the broker to fence." }
  ]
} 

UnfenceBrokerRecord

{
  "apiKey": 8,
  "type": "metadata",
  "name": "UnfenceBrokerRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Id", "type": "int32", "versions": "0+",
      "about": "The broker ID to unfence." }
    { "name": "Epoch", "type": "int64", "versions": "0+",
      "about": "The epoch of the broker to unfence." }
  ]
} 

RemoveTopic

{
  "apiKey": 9,
  "type": "metadata",
  "name": "RemoveTopicRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Id", "type": "uuid", "versions": "0+",
      "about": "The topic to remove. All associated partitions will be removed as well." }
  ]
} 

DelegationTokenRecord

{
  "apiKey": 10,
  "type": "metadata",
  "name": "DelegationTokenRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Owner", "type": "string", "versions": "0+",
      "about": "The delegation token owner." },
    { "name": "Renewers", "type": "[]string", "versions": "0+",
      "about": "The principals which have renewed this token." },
    { "name": "IssueTimestamp", "type": "int64", "versions": "0+",
      "about": "The time at which this timestamp was issued." },
    { "name": "MaxTimestamp", "type": "int64", "versions": "0+",
      "about": "The time at which this token cannot be renewed any more." },
    { "name": "ExpirationTimestamp", "type": "int64", "versions": "0+",
      "about": "The next time at which this token must be renewed." },
    { "name": "TokenId", "type": "string", "versions": "0+",
      "about": "The token id." },
  ]
} 

UserScramCredentialRecord

{
  "apiKey": 11,
  "type": "metadata",
  "name": "UserScramCredentialRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Name", "type": "string", "versions": "0+",
      "about": "The user name." },
    { "name": "CredentialInfos", "type": "[]CredentialInfo", "versions": "0+",
      "about": "The mechanism and related information associated with the user's SCRAM credential.", "fields": [
      { "name": "Mechanism", "type": "int8", "versions": "0+",
        "about": "The SCRAM mechanism." },
      { "name": "Salt", "type": "bytes", "versions": "0+",
        "about": "A random salt generated by the client." },
      { "name": "SaltedPassword", "type": "bytes", "versions": "0+",
        "about": "The salted password." },
      { "name": "Iterations", "type": "int32", "versions": "0+",
        "about": "The number of iterations used in the SCRAM credential." }]}
  ]
} 

FeatureLevelRecord

{
  "apiKey": 12,
  "type": "metadata",
  "name": "FeatureLevelRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
      "about": "The feature name." },
    { "name": "MinFeatureLevel", "type": "int16", "versions": "0+",
      "about": "The current finalized minimum feature level of this feature for the cluster." },
    { "name": "MaxFeatureLevel", "type": "int16", "versions": "0+",
      "about": "The current finalized maximum feature level of this feature for the cluster." }
  ]
} 

FailedReplicasRecord

{
  "apiKey": 13,
  "type": "metadata",
  "name": "FailedReplicasRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Broker", "type": "int32", "versions": "0+",
      "about": "The broker id." },
    { "name": "Topic", "type": "uuid", "versions": "0+",
      "about": "The topic UUID." }
    { "name": "Partitions", "type": "[]int32", "versions": "0+",
      "about": "The partition ids." }
  ]
} 

New Metrics

Full NameDescription

kafka.controller:type=KafkaController,name=MetadataLag

The offset delta between the latest metadata record this controller has replayed and the last stable offset of the metadata topic.

kafka.controller:type=KafkaServer,name=MetadataLagThe offset delta between the latest metadata record this broker has replayed and the last stable offset of the metadata topic.
kafka.controller:type=KafkaController,name=MetadataCommitLatencyMsThe latency of committing a message to the metadata topic.  Relevant on the active controller.
kafka.controller:type=KafkaController,name=MetadataCommitRateThe number of metadata messages per second committed to the metadata topic.
kafka.controller:type=KafkaController,name=MetadataSnapshotLag

New name for kafka.controller:type=KafkaController,name=SnapshotLag

The offset delta between the latest stable offset of the metadata topic and the offset of the last snapshot (or the last stable offset itself, if there are no snapshots)

Unused Metrics in KIP-500 Mode

We will deprecate these metrics as soon as legacy mode is deprecated.  For now, they will be unused in KIP-500 mode.

Full NameDescription

kafka.server:type=SessionExpireListener,name=ZooKeeperExpiresPerSec

No longer needed when running in KIP-500 mode because we won't have any ZK sessions

Compatibility, Deprecation, and Migration Plan

As described above, this KIP outlines a new mode that the broker can run in, KIP-500 mode.  For now, this mode will be experimental, and there will be no way to migrate existing clusters from legacy mode to KIP-500 mode.  We plan on outlining how this upgrade process will work in a follow-on KIP.  We do plan on deprecating legacy mode eventually, but we are not quite ready to do it yet in this KIP.

Since KIP-500 mode is currently in a pre-alpha state, we do not guarantee that future versions will support upgrading from the current version of it yet.  Once it is more stable, we will have a more traditional binary compatibility regime.

Rejected Alternatives

Suport Automatic Broker ID Assignment

This KIP proposes to drop support for automatic broker ID assignment.  What if we decided to continue to support it?

If we were willing to take a little bit more complexity on board, it would be relatively easy to support automatic broker ID assignment.  Brokers could simply ask the active controller to assign them a new ID when starting up, just as they previously obtained one from ZooKeeper.

However, automatic controller ID assignment is a much more difficult problem.  We never actually supported automatically assigning ZooKeeper IDs, so there is no pattern to follow here.  In general, Raft assumes that nodes know their IDs before the protocol begins. We cannot rely on random assignment because the 31 bit space is not large enough. We could perhaps create a separate protocol for assigning node IDs, but it might be complex.  

In general it's not clear how useful automatic broker ID assignment really is.  Configuration management software like Puppet, Chef, or Ansible can easily create a new ID for each node's configuration file.  Therefore, it's probably best to use this compatibility break to drop support for automatic broker ID assignment.

Combined Heartbeats and Fetch Requests

The brokers are always fetching new metadata from the controller.  Why not combine these fetch requests with the heartbeat requests, so that the brokers only have to send one request rather than two?

The main reason for making them separate requests is to have better separation of concerns.  Fetching metadata is logically a bit different than sending a heartbeat, and coupling them could result in a messy design and code.  We would have to add significant extra complexity to the FetchRequest schema.  Perhaps even worse, we would need to make the timing of fetch requests line up with the timing needed for broker heartbeats.

Shared IDs between Multiple Nodes

One possibility that we've discussed is whether we could have controller IDs and broker IDs share the same ID space.  For example, could there be both a broker 1 and a controller 1?  This is not a good idea because NetworkClient assumes a single ID space.  So if there is both a controller 1 and a broker 1, we don't have a way of picking the "right" one.  We would have to add a concept of node types.  That is a fair amount of extra work.  It also is a bit conceptually messy for the network layer to understand node types, rather than just treating them all as endpoints.

A related question is whether a broker and a controller could share the same port, if they are co-located in the same JVM.  We discussed this as part of KIP-590 and agreed that we would not share ports.  A shared port would make it impossible to have separate RPC handlers, complicating the code.  It would also make it impossible to have separate, stricter authentication for controllers.  Sharing a port with the broker opens us up to denial of service attacks, including unintentional ones.

  • No labels