You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 61 Next »

Status

Current state: under discussion

Discussion threadhere

JIRA:

Released: TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Apache Kafka is in the process of moving from storing metadata in Apache Zookeeper, to storing metadata in an internal Raft topic.  KIP-500 described the overall architecture and plan.  The purpose of this KIP is to go into detail about how the Kafka Controller will change during this transition.

Proposed Changes

Deployment

KIP-500 Mode

Once this KIP is implemented, system administrators will have the option of running in KIP-500 mode.  In this mode, we will not use ZooKeeper  The alternative mode where KIP-500 support is not enabled will be referred to as legacy mode.

KIP-500 mode must be enabled for the entire cluster, not just for specific nodes.  Initially, this mode will be considered experimental and not ready for production.  As we do more testing and gain more confidence, we will remove the experimental label.  Eventually, in a future release, KIP-500 mode will be the only supported mode.  Since dropping support for legacy mode is an incompatible change, it will need to happen in a major release, of course.

Initially, we will not support upgrading a cluster from legacy mode to KIP-500 mode.  This is in keeping with the experimental nature of KIP-500 mode.  A future KIP will describe and implement an upgrade process from legacy mode to KIP-500 mode.

Before being used in KIP-500 mode, the storage directories on a node must be formatted.  This requirement prevents system administrators from accidentally enabling KIP-500 mode by simply making a configuration change.  Requiring formatting also prevents mistakes, since Kafka no longer has to guess if an empty storage directory is a newly directory or one where a system error prevented any data from showing up.

Nodes

Currently, a ZooKeeper cluster must be deployed when running Kafka.  This KIP will eliminate that requirement, as well as the requirement to configure the addresses of the zookeeper nodes on each broker.

Currently, any broker node can be elected as the controller.  As part of this KIP, the active controller will instead be selected among a small pool of nodes specifically configured to act as controllers.  Typically three or five nodes in the cluster will be selected to be controllers.

System administrators will be able to choose whether to run separate controller nodes, or whether to run controller nodes which are co-located with broker nodes.  Kafka will provide support for running a controller in the same JVM as a broker, in order to save memory and enable single-process test deployments.

The addresses and ports of the controller nodes must be configured on each broker, so that the broker can contact the controller quorum when starting up.  This is similar to how we configure the ZooKeeper quorum on each node today.

Note that as long as at least one of the provided controller addresses is valid, the broker will be able to learn about the current metadata quorum and start up.  Once the broker is in contact with the metadata quorum, the quorum bootstrap addresses will not be needed.  This makes it possible to reconfigure the metadata quorum over time.  For example, if we start with a metadata quorum of host1, host2, host3, we could replace host3 with host4 without disrupting any of the brokers.  Then we could roll the brokers to apply the new metadata quorum bootstrap configuration of host1, host2, host4 on each one.

Node IDs

Just like brokers, controller nodes will have non-negative integer node IDs.  There will be a single ID space.  In other words, no controller should share the same ID as a broker.  Even when a broker and a controller are co-located in the same JVM, they must have different node IDs.

Automatic node ID assignment via ZooKeeper will no longer be supported in KIP-500 mode.  Node IDs must be set in the configuration file for brokers and controllers.

Networking

Controller processes will listen on a separate endpoint from brokers.  This will be true even when the broker and controller are co-located in the same JVM.

In a well-run Kafka deployment, controller ports, like ZooKeeper ports, should be firewalled off from clients.  This will prevent clients from disrupting the cluster by flooding the controller ports with requests.  In the realm of ACLs, this translates to controllers requiring CLUSTERACTION on CLUSTER for all operations.

The only time when clients should contact a controller node directly is when they are debugging system issues.  This is similar to ZooKeeper, where we have things like zk-shell, but only for debugging.

Metadata

The Metadata Topic

As described in KIP-500, the controller will store its data in the internal @metadata topic.  This topic will contain a single partition which is managed by Raft, as described in KIP-595: A Raft Protocol for the Metadata Quorum.

The leader of the controller quorum will be the active controller.  The followers will function as hot standbys, ready to take over when the active leader fails or resigns.  The metadata will be stored in memory on all of the controllers.

Persistence and Visibility

Metadata changes need to be persisted to the @metadata log before we apply them on the other nodes in the cluster.  This means waiting for the metadata log's last stable offset to advance to the offset of the change.  After that point, we are guaranteed not to lose the change as long as we uphold the Raft invariants.

Changes that we haven't yet persisted are referred to as "uncommitted."  The active controller may have several of these uncommitted changes in flight at any given time.  In essence, the controller's in-memory state is always a little bit in the future compared to the current state.  This allows the active controller to continue doing things while it waits for the previous changes to be committed to the Raft log.

However, this "future state" may never be committed.  For example, the active controller might fail, truncating some of its future state.  Therefore, the active controller must not make this future state "visible" to the rest of the cluster until it has been made persistent – that is, until it becomes current state.  In the case of the @metadata topic, the replication protocol itself neatly takes care of this for us.  In the case of controller RPCs like AlterIsr, the controller handles this by not sending back a response until the designated change has been persisted.

                      earlier offsets
                          +---+
     visible change ----> |   |
                          +---+       last stable offset
     visible change ----> |   | <---- standby controllers'
                          +---+       in-memory state
     pending change ----> |   |
                          +---+
     pending change ----> |   |
                          +---+
     pending change ----> |   |
                          +---+
     pending change ----> |   | <---- active controller's
                          +---+       in-memory state
                      latest offset

Record Formats

The active controller makes changes to the metadata by appending records to the log. Each record has a null key, and this format for its value:

  1. an unsigned varint specifying the record type.
  2. an unsigned varint specifying the record version
  3. the payload in Kafka RPC format

For example, if we wanted to encode a TopicRecord, we might have 1 encoded as a varint, followed by 0 as the version, followed by the serialized topic data.

The record type and version will typically only take one byte each, for a total header size of two bytes.

Record Format Versions

There are two ways to evolve the format of a KIP-500 record.  One is to add KIP-482 optional tagged fields.  These will be ignored by older software, but can contain additional data for new software to handle.  The other choice is to bump the version of the record.

In the pre-KIP-500 world, we had the inter-broker protocol (IBP) setting to control what RPC versions the controller used to communicate with the brokers.  This allowed us to evolve the inter-broker RPC format over time.  We also used it to gate many other features, such as metadata format changes.  In the post-KIP-500 world, the analogous setting is the metadata.format KIP-584 feature flag. This setting controls the snapshot and delta formats which the controller will use.

Snapshot Implementation

As time goes on, the number of records will grow and grow, even if the total size of the metadata stays constant.  Therefore, periodically, we need to consolidate all the metadata deltas into a snapshot.

Like the metadata log, the snapshot is made up of records.  However, unlike the log, in which there may be multiple records describing a single entity, the snapshot will only contain the minimum number of records needed to describe all the entities.

Snapshots are local to each replica.  For example, replica A may have a snapshot at offset 100, and deltas up to offset 150, whereas replica B may have a snapshot at 125 and deltas up to offset 150.  Any snapshot must be usable as a starting point for loading the entire state of metadata.  In other words, a new controller node must be able to load the a snapshot, and then apply all the edits which follow it, and come up-to-date.

The currently active controller will monitor the offset of the latest snapshot made by all replicas, including itself.  The snapshotting state of each node is considered soft state: it is not persisted anywhere in the log, but purely communicated by heartbeats and stored in memory by the active controller.

Broker Registration and State Management

Broker Startup

When starting up, the broker will send a fetch request to the controller, getting the state of the metadata log.  If the broker's local copy of the metadata log is too far behind, the broker will fetch a complete snapshot from the controller, as described in KIP-630.  Otherwise, the broker will fetch just what it needs.  Once it has caught up to the high water mark of the controller (or at least, what the controller's high water mark used to be), it will be able to remove the log directories which it no longer needs.

Broker Registration and Heartbeats

Every distributed system needs a way of managing cluster membership.  Prior to KIP-500, Kafka brokers registered ephemeral znodes in order to register themselves as part of the cluster.  The Kafka controller passively consumed the registration information from Zookeeper.

In the post-KIP-500 world there is no ZooKeeper and no ephemeral znodes.  Instead, each broker registers itself with the active controller using a BrokerRegistrationRequest. The active controller assigns the broker a new broker epoch, based on the latest committed offset in the log.  Subsequently, the broker sends a heartbeat request to the active controller every few seconds to keep the registration active.

Broker registrations are time-bounded.  They only last for a certain amount of time.  This period is specified by the controller in its response.  Once the period has elapsed, if the broker has not renewed its registration via a heartbeat, it must re-register.

Unlike ZooKeeper, the active controller may choose not to accept a broker's registration.  For example, it will do this if the ID that the broker is trying to claim has already been claimed.  In that case, the broker should shut down rather than trying to claim a broker ID which is already in use.  Another reason that the controller can reject a registration is is if the broker doesn't support all of the KIP-584 feature levels that are currently enabled in the cluster.

Finally, the broker registration and heartbeat mechanism gives the broker a chance to specify its target state, and the controller to tell the broker what its actual state should be.

Fencing

Brokers that don't have a broker ID lease are said to be "fenced."  When a broker is fenced, it cannot process any client requests.  This prevents brokers which are not receiving metadata updates or that are not receiving and processing them fast enough from causing issues to clients.

Brokers start up in the fenced state, and can leave this state only by sending a heartbeat to the active controller and getting back a response that tells them they can become active.

Brokers will re-enter the fenced state if they are unable to communicate with the active controller within registration.lease.timeout.ms.

Controlled Shutdown

In the pre-KIP-500 world, brokers triggered a controller shutdown by making an RPC to the controller.  When the controller returned a successful result from this RPC, the broker knew that it could shut down.

In the post-KIP-500 world, controller shutdown is handled by the broker heartbeat system instead.  In its periodic heartbeats, the broker asks the controller if it can transition into the SHUTDOWN state.  This motivates the controller to move all of the leaders off of that broker.  Once they are all moved, the controller responds to the heartbeat with a nextState of SHUTDOWN.

Broker ID Conflicts

Clearly, in a correctly managed cluster, there should be no broker ID conflicts.  Each broker should be configured with a unique ID.  However, we want the system to be robust against misconfigurations.  Therefore, if there are two brokers that claim the same ID, the controller will choose only one and tell the other to fence itself.

When a broker first starts up, when it is in the INITIAL state, it will always "win" broker ID conflicts.  However, once it is granted a lease, it transitions out of the INITIAL state.  Thereafter, it may lose subsequent conflicts if its broker epoch is stale.  (See KIP-380 for some background on broker epoch.)  The reason for favoring new processes is to accommodate the common case where a process is killed with kill -9 and then restarted.  We want it to be able to reclaim its old ID quickly in this case.  The controller can generate a new broker epoch by using the latest log offset.

The Broker State Machine

This state machine ties together a few of the previous sections, such as broker registration and shutdown.

NOT_RUNNING(0)

The broker starts up in this state.

If it needs to recover log directories, it transitions to RECOVERING_FROM_UNCLEAN_SHUTDOWN.

Otherwise, it transitions to REGISTERING

On a control-C in this state, the broker process shuts down.

REGISTERING(1)

While in this state, the broker tries to register with the active controller.

If the active controller refuses the registration, the broker process shuts down.

Otherwise, the broker moves into the FENCED state.

On a control-C in this state, the broker process shuts down.

RECOVERING_FROM_UNCLEAN_SHUTDOWN(2)

While in this state, the broker is recovering log directories.

Once it is done, it transitions to REGISTERING.

On a control-C in this tate, the broker process shuts down.

RUNNING(3)

While in this state, the broker is accepting client requests and responding.

If the broker is unable to heartbeat to the active controller, it transitions to the FENCED state.

On a control-C in this state, the broker transitions into PENDING_CONTROLLED_SHUTDOWN.

FENCED(4)

While in this state, the broker does not respond to client requests.

It will try to re-register itself with the active  controller.  If it succeeds, it will go into the RUNNING state.

On a control-C in this state, the broker process shuts down.

PENDING_CONTROLLED_SHUTDOWN(6)

While in this state, the broker is trying to shut down gracefully.

The controller will move the leadership of partitions off of this broker.

Once the controller tells the broker it can shut down (in a heartbeat response), it will shut down.

Public Interfaces

Command-Line Tools

There will be a new command-line tool, kafka-storage.sh.

kafka-storage.sh: inspect and modify kafka storage directories
subcommands:
    info: print out information about kafka storage directories
        -c|--config [path]: the configuration file to use
    format: format the kafka storage directories
        -c|--config [path]: the configuration file to use
        -f|--force: format even if the directories are not empty.
        -d|--directory [path]: format only a single directory, rather than all directories

The info subcommand will display information about each directory, such as whether it exists, is formatted, etc.

The format subcommand will initialize the subdirectories.

A cluster configured in KIP-500 mode will contain the following line in the meta.properties file in each directory:

kip.500.mode=enabled

If the storage directories in a node have not been properly formatted, the node will not be able to start up in KIP-500 mode.  On the other hand, formatting will continue to be optional for legacy mode.

Configurations

Configuration NamePossible ValuesDescription
process.roles

null

broker

controller

broker,controller

If this is null (absent) then we are in legacy mode.

Otherwise, we are in KIP-500 mode and this configuration determines what roles this process should play: broker, controller, or both.

controller.listener.names

If non-null, this must be a comma-separated list of listener names.

When communicating with the controller quorum, the broker will always use the first listener in this list.

A comma-separated list of the names of the listeners used by the KIP-500 controller. This is required if this process is a KIP-500 controller. The legacy controller will not use this configuration

Despite the similar name, note that this is different from the "control plane listener" introduced by KIP-291.  The "control plane listener" is used on brokers, not on controllers.

controller.connect

If non-null, this must be a comma-separated list of all the controller voters, in the format:

{controller-id}@{controller-host):{controller-port}

When in KIP-500 mode, each node must have this configuration, in order to find out how to communicate with the controller quorum.

Note that this replaces the "quorum.voters" config described in KIP-595.

controller.id

a 32-bit ID

The controller id for this server.  Only required if this server is a controller.

broker.ida 32-bit IDThe broker id for this server.  Only required if this server is a broker.
registration.heartbeat.interval.ms2000The length of time between broker heartbeats.
registration.lease.timeout.ms18000The length of time that a broker lease lasts if no heartbeats are made.
metadata.log.dirIf set, this must be a path to a log directory.This configuration determines where we put the metadata log.  if it is not set, the metadata log is placed in the first log directory from log.dirs.
controller.quorum.fetch.timeout.msMaximum time without a successful fetch from the current leader before a new election is started.New name for quorum.fetch.timeout.ms

controller.quorum.election.timeout.ms

Maximum time without collected a majority of votes during the candidate state before a new election is retriedNew name for quorum.election.timeout.ms
controller.quorum.election.backoff.max.msMaximum exponential backoff time (based on the number if retries) after an election timeout, before a new election is triggered.New name for quorum.election.backoff.max.ms
controller.quorum.request.timeout.msMaximum time before a pending request is considered failed and the connection is droppedNew name for quorum.request.timeout.ms
controller.quorum.retry.backoff.msInitial delay between request retries. This config and the one below is used for retriable request errors or lost connectivity and are different from the election.backoff configs aboveNew name for quorum.retry.backoff.ms
controller.quorum.retry.backoff.max.msMax delay between requests. Backoff will increase exponentially beginning from quorum.retry.backoff.msNew name for quorum.retry.backoff.max.ms

New Error Codes

DUPLICATE_BROKER_REGISTRATION

There will be a new error code, DUPLICATE_BROKER_REGISTRATION, that the active controller will return when a broker tries to register with an ID that is currently in use.

RPCs

Obsoleting the Metadata Propagation RPCs

As discussed earlier, the new controller will use FetchRequest to fetch metadata from the active controller.  The details of how Raft fetching will work are spelled out in KIP-595: A Raft Protocol for the Metadata Quorum.

Since we propagate the metadata via Raft, we will no longer need to send out LeaderAndIsrRequest, UpdateMetadataRequest, and StopReplicaRequest.  These requests will be sent out only when we're in legacy mode, not when we're in KIP-500 mode.  Eventually we will add some support for these requests to the new controller, in order to support rolling upgrade from a pre-KIP-500 release. However, for the purpose of this KIP, the new controller will not use these requests.

Obsoleting the Controlled Shutdown RPC

The broker heartbeat mechanism replaces the controlled shutdown RPC.  Therefore, we will not need to support the this RPC any more in the controller-- except for compatibility during upgrades, which will be described further in a follow-on KIP.

BrokerRegistration

{
  "apiKey": 57,
  "type": "request",
  "name": "BrokerRegistrationRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker epoch, or -1 if one has not yet been assigned." },
    { "name": "CurMetadataOffset", "type": "int64", "versions": "0+",
      "about": "The highest metadata offset which the broker has reached." },
    { "name": "Listeners", "type": "[]Listener",
      "about": "The listeners of this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname." },
        { "name": "Port", "type": "int16", "versions": "0+",
          "about": "The port." },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol." }
      ]
    { "name": "Features", "type": "[]Feature",
      "about": "The features on this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The feature name." }
        { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The minimum supported feature level." },
        { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
          "about": "The maximum supported feature level." }
      ]
    },
    { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The rack which this broker is in." }
  ]
}

{
  "apiKey": 57,
  "type": "response",
  "name": "BrokerRegistrationResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "ActiveControllerId", "type": "int32", "versions": "0+",
      "about": "The ID of the active controller, or -1 if the controller doesn't know." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker's assigned epoch, or -1 if none was assigned." },
    { "name": "LeaseDurationMs", "type": "int64", "versions": "0+",
      "about": "If BrokerEpoch is not -1, the number of milliseconds that we want the lease to last." }
  ]
}

BrokerHeartbeat

As described earlier, the broker periodically sends out a heartbeat request to the active controller.

{
  "apiKey": 58,
  "type": "request",
  "name": "BrokerHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "TargetState", "type": "int8", "versions": "0+",
      "about": "The state that the broker wants to reach." },
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker epoch." },
    { "name": "CurrentMetadataOffset", "type": "int64", "versions": "0+",
      "about": "The highest metadata offset which the broker has reached." }
  ]
}   

{
  "apiKey": 58,
  "type": "response",
  "name": "BrokerHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "ActiveControllerId", "type": "int32", "versions": "0+",
      "about": "The ID of the active controller, or -1 if the controller doesn't know." },
    { "name": "NextState", "type": "int8", "versions": "0+",
      "about": "The state to which the broker should transition." },
    { "name": "LeaseDurationMs", "type": "int64", "versions": "0+",
      "about": "If BrokerEpoch is not -1, the number of milliseconds that we want the lease to last." }
  ]
}

The controller will wait to unfence a broker until it has sent at least one heartbeat where that broker's currentState is active.  So a typical transition will look like this:

  • broker sends a BrokerHeartbeatRequest with currentState = fenced, targetState = active.  controller response with nextState = active
  • broker sends a BrokerHeartbeatRequest with currentState = active, targetState = active.  controller response with nextState = active
  • controller unfences the broker

The second step informs the controller that the broker has received its response to the first step.  In order to speed this process along, the broker will send the second heartbeat immediately after receiving the first, rather than waiting for the heartbeat timeout.

As always with enums, the UNKNOWN state is used only to translate values that our software is too old to understand.

The controller will return NOT_CONTROLLER if it is not active.  Brokers will always return NOT_CONTROLLER for these RPCs.

Heartbeat requests that give a CurMetadataOffset which is lower than the previous one recorded for that broker will be ignored.

Record Formats

BrokerRecord

{
  "apiKey": 0,
  "type": "metadata",
  "name": "BrokerRecord",
  "validVersions": "0",
  "fields": [
	{ "name": "Id", "type": "int32", "versions": "0+",
	  "about": "The broker id." },
	{ "name": "Epoch", "type": "int64", "versions": "0+",
	  "about": "The broker epoch." },
	{ "name": "EndPoints", "type": "[]BrokerEndpoint", "versions": "0+", "nullableVersions": "0+",
	  "about": "The endpoints that can be used to communicate with this broker.", "fields": [
		{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
		  "about": "The name of the endpoint." },
		{ "name": "Host", "type": "string", "versions": "0+",
		  "about": "The hostname." },
		{ "name": "Port", "type": "int16", "versions": "0+",
		  "about": "The port." },
		{ "name": "SecurityProtocol", "type": "int16", "versions": "0+",
		  "about": "The security protocol." }
	]},
	{ "name": "Features", "type": "[]BrokerFeature", "versions": "0+", "nullableVersions": "0+",
	  "about": "The features that this broker supports.", "fields": [
		{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
		  "about": "The name of the feature." },
		{ "name": "MinVersion", "type": "int16", "versions": "0+",
		  "about": "The minimum feature level that this broker supports." },
		{ "name": "MaxVersion", "type": "int16", "versions": "0+",
		  "about": "The maximum feature level that this broker supports." }
	]},
	{ "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
	  "about": "The broker rack." }
  ]
}

TopicRecord

{
  "apiKey": 1,
  "type": "metadata",
  "name": "TopicRecord",
  "validVersions": "0",
  "fields": [
        { "name": "Name", "type": "string", "versions": "0+",
          "about": "The topic name." },
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The unique ID of this topic." },
        { "name": "Deleting", "type": "boolean", "versions": "0+",
          "about": "True if this topic is in the process of being deleted." }
  ]
}

PartitionRecord

{
  "apiKey": 2,
  "type": "metadata",
  "name": "PartitionRecord",
  "validVersions": "0",
  "fields": [
    { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The partition id." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique ID of this topic." },
    { "name": "Replicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas of this partition, sorted by preferred order." },
    { "name": "Isr", "type":  "[]int32", "versions":  "0+",
      "about": "The in-sync replicas of this partition" },
    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas that we are in the process of removing." },
    { "name": "AddingReplicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas that we are in the process of adding." },
    { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The lead replica, or -1 if there is no leader." },
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
      "about": "An epoch that gets incremented each time we change the leader." }
  ]
}

ConfigRecord

{         
  "apiKey": 3,
  "type": "metadata",
  "name": "ConfigRecord",
  "validVersions": "0",
  "fields": [
    { "name": "ResourceType", "type": "int8", "versions": "0+",
      "about": "The type of resource this configuration applies to." },
    { "name": "ResourceName", "type": "string", "versions": "0+",
      "about": "The name of the resource this configuration applies to." },         
    { "name": "Name", "type": "string", "versions": "0+",
      "about": "The name of the configuration key." },                  
    { "name": "Value", "type": "string", "versions": "0+",     
      "about": "The value of the configuration." }
  ]           
} 

IsrChange

{
  "apiKey": 4,
  "type": "metadata",
  "name": "IsrChangeRecord",
  "validVersions": "0",
  "fields": [
    { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The partition id." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique ID of this topic." },
    { "name": "Isr", "type":  "[]int32", "versions":  "0+",
      "about": "The in-sync replicas of this partition" },
    { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The lead replica, or -1 if there is no leader." },
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
      "about": "An epoch that gets incremented each time we change the leader." }
  ]
} 

AccessControlRecord

{
  "apiKey": 5,
  "type": "metadata",
  "name": "AccessControlRecord",
  "validVersions": "0",
  "fields": [
    { "name": "ResourceType", "type": "int8", "versions": "0+",
      "about": "The resource type" },
    { "name": "ResourceName", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The resource name, or null if this is for the default resource." },
    { "name": "PatternType", "type": "int8", "versions": "0+",
      "about": "The pattern type (literal, prefixed, etc.)" },
    { "name": "Principal", "type": "string", "versions": "0+",
      "about": "The principal name." },
    { "name": "Host", "type": "string", "versions": "0+",
      "about": "The host." },
    { "name": "Operation", "type": "int8", "versions": "0+",
      "about": "The operation type." },
    { "name": "PermissionType", "type": "int8", "versions": "0+",
      "about": "The permission type (allow, deny)." }
  ]
} 

FenceBroker

{
  "apiKey": 6,
  "type": "metadata",
  "name": "FenceBrokerRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Id", "type": "int32", "versions": "0+",
      "about": "The broker ID to fence. It will be removed from all ISRs." }
    { "name": "Epoch", "type": "int64", "versions": "0+",
      "about": "The epoch of the broker to fence." }
  ]
} 

RemoveTopic

{
  "apiKey": 7,
  "type": "metadata",
  "name": "RemoveTopicRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Id", "type": "uuid", "versions": "0+",
      "about": "The topic to remove. All associated partitions will be removed as well." }
  ]
} 

DelegationToken

{
  "apiKey": 8,
  "type": "metadata",
  "name": "DelegationTokenRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Owner", "type": "string", "versions": "0+",
      "about": "The delegation token owner." },
    { "name": "Renewers", "type": "[]string", "versions": "0+",
      "about": "The principals which have renewed this token." },
    { "name": "IssueTimestamp", "type": "int64", "versions": "0+",
      "about": "The time at which this timestamp was issued." },
    { "name": "MaxTimestamp", "type": "int64", "versions": "0+",
      "about": "The time at which this token cannot be renewed any more." },
    { "name": "ExpirationTimestamp", "type": "int64", "versions": "0+",
      "about": "The next time at which this token must be renewed." },
    { "name": "TokenId", "type": "string", "versions": "0+",
      "about": "The token id." },
  ]
} 

ScramUser

{
  "apiKey": 9,
  "type": "metadata",
  "name": "DelegationTokenRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Name", "type": "string", "versions": "0+",
      "about": "The user name." },
    { "name": "CredentialInfos", "type": "[]CredentialInfo", "versions": "0+",
      "about": "The mechanism and related information associated with the user's SCRAM credential.", "fields": [
      { "name": "Mechanism", "type": "int8", "versions": "0+",
        "about": "The SCRAM mechanism." },
      { "name": "Salt", "type": "bytes", "versions": "0+",
        "about": "A random salt generated by the client." },
      { "name": "SaltedPassword", "type": "bytes", "versions": "0+",
        "about": "The salted password." },
      { "name": "Iterations", "type": "int32", "versions": "0+",
        "about": "The number of iterations used in the SCRAM credential." }]}
  ]
} 

FeatureLevel

{
  "apiKey": 10,
  "type": "metadata",
  "name": "FeatureLevelRecord",
  "validVersions": "0",
  "fields": [
    { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
      "about": "The feature name." },
    { "name": "MinFeatureLevel", "type": "int16", "versions": "0+",
      "about": "The current finalized minimum feature level of this feature for the cluster." },
    { "name": "MaxFeatureLevel", "type": "int16", "versions": "0+",
      "about": "The current finalized maximum feature level of this feature for the cluster." }
  ]
} 

New Metrics

Full NameDescription

kafka.controller:type=KafkaController,name=MetadataLag

The offset delta between the latest metadata record this controller has replayed and the last stable offset of the metadata topic.

kafka.controller:type=KafkaServer,name=MetadataLagThe offset delta between the latest metadata record this broker has replayed and the last stable offset of the metadata topic.
kafka.controller:type=KafkaController,name=MetadataCommitLatencyMsThe latency of committing a message to the metadata topic.  Relevant on the active controller.
kafka.controller:type=KafkaController,name=MetadataCommitRateThe number of metadata messages per second committed to the metadata topic.
kafka.controller:type=KafkaController,name=MetadataSnapshotLag

New name for kafka.controller:type=KafkaController,name=SnapshotLag

The offset delta between the latest stable offset of the metadata topic and the offset of the last snapshot (or the last stable offset itself, if there are no snapshots)

Unused Metrics in KIP-500 Mode

We will deprecate these metrics as soon as legacy mode is deprecated.  For now, they will be unused in KIP-500 mode.

Full NameDescription

kafka.server:type=SessionExpireListener,name=ZooKeeperExpiresPerSec

No longer needed when running in KIP-500 mode because we won't have any ZK sessions

Compatibility, Deprecation, and Migration Plan

As described above, this KIP outlines a new mode that the broker can run in, KIP-500 mode.  For now, this mode will be experimental, and there will be no way to migrate existing clusters from legacy mode to KIP-500 mode.  We plan on outlining how this upgrade process will work in a follow-on KIP.  We do plan on deprecating legacy mode eventually, but we are not quite ready to do it yet in this KIP.

Since KIP-500 mode is currently in a pre-alpha state, we do not guarantee that future versions will support upgrading from the current version of it yet.  Once it is more stable, we will have a more traditional binary compatibility regime.

Rejected Alternatives

Support Automatic Broker ID Assignment

This KIP proposes to drop support for automatic broker ID assignment.  What if we decided to continue to support it?

If we were willing to take a little bit more complexity on board, it would be relatively easy to support automatic broker ID assignment.  Brokers could simply ask the active controller to assign them a new ID when starting up, just as they previously obtained one from ZooKeeper.

However, automatic controller ID assignment is a much more difficult problem.  We never actually supported automatically assigning ZooKeeper IDs, so there is no pattern to follow here.  In general, Raft assumes that nodes know their IDs before the protocol begins. We cannot rely on random assignment because the 31 bit space is not large enough. We could perhaps create a separate protocol for assigning node IDs, but it might be complex.  

In general it's not clear how useful automatic broker ID assignment really is.  Configuration management software like Puppet, Chef, or Ansible can easily create a new ID for each node's configuration file.  Therefore, it's probably best to use this compatibility break to drop support for automatic broker ID assignment.

Combined Heartbeats and Fetch Requests

The brokers are always fetching new metadata from the controller.  Why not combine these fetch requests with the heartbeat requests, so that the brokers only have to send one request rather than two?

The main reason for making them separate requests is to have better separation of concerns.  Fetching metadata is logically a bit different than sending a heartbeat, and coupling them could result in a messy design and code.  We would have to add significant extra complexity to the FetchRequest schema.  Perhaps even worse, we would need to make the timing of fetch requests line up with the timing needed for broker heartbeats.

  • No labels