Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Apache Kafka is in the process of moving from storing metadata in Apache Zookeeper, to storing metadata in an internal Raft topic.  KIP-500 described the overall architecture and plan.  The purpose of this KIP is to go into detail about how the Kafka Controller will change during this transition.

...

Once this KIP is implemented, system administrators will have the option of running in KIP-500 mode.  In this mode, we will not use ZooKeeper.  ZooKeeper  The alternative mode where KIP-500 support is not enabled will be referred to as legacy mode.

...

Currently, any broker node can be elected as the controller.  As part of this KIP, the active controller nodes will become special nodes identified by a separate configurationwill instead be selected among a small pool of nodes specifically configured to act as controllers.  Typically three or five nodes in the cluster will be selected to be controllers.

System administrators will be able to choose whether to run separate controller nodes, or whether to run controller nodes which are co-located with broker nodes.  Kafka will provide support for running a controller in the same JVM as a broker, in order to save memory and enable single-process test deployments.

The addresses and ports of the controllers controller nodes must be configured on each broker, so that the broker can contact the controller quorum when starting up.  This is similar to how we configure the ZooKeeper quorum on each node today.

Note that as long as at least one of the provided controller addresses is valid, the broker will be able to learn about the current metadata quorum and start up.  Once the broker is in contact with the metadata quorum, the quorum bootstrap addresses will not be needed.  This makes it possible to reconfigure the metadata quorum over time.  For example, if we start with a metadata quorum of host1, host2, host3, we could replace host3 with host4 without disrupting any of the brokers.  Then we could roll the brokers to apply the new metadata quorum bootstrap configuration of host1, host2, host4 on each one.

System administrators will be able to choose whether to run separate controller nodes, or whether to run controller nodes which are co-located with broker nodes.  Kafka will provide support for running a controller in the same JVM as a broker, in order to save memory and enable single-process test deployments.

Node IDs

Just like brokers, controller nodes will have non-negative integer node IDs.  There will be a single ID space.  In other words, no controller should share the same ID as a broker.  Even when a broker and a controller are co-located in the same JVM, they must have different node IDs.

...

When a broker is fenced, it cannot process any client requests.

There are a few reasons why we might want the broker to be in this state.  The first is that the broker is truly isolated from the controller quorum, and therefore not up-to-date on its metadata.  The second is if the broker is attempting to use the ID of another broker which has already been registered.

Metadata Edits

Each change that the controller makes will generate a metadata edit log entry, also known as an"edit."  These edits will be persisted to the metadata partition.

The format of each edit will be:

  1. a varint specifying the edit type
  2. a varint specifying the edit version
  3. the payload in Kafka RPC format

If the controller tries to read an entry whose type or version is unknown, it will fail and abort the process.  The types and versions which the controller uses to write will be controlled by the KIP-584 feature flags which we have enabled.

  This prevents brokers which are not receiving metadata updates or that are not receiving and processing them fast enough from causing issues to clients.

Another reason why a controller might ask a broker to fence itself is to prevent the broker from claiming a broker ID that is already in use by a different broker.  In general, our goal with broker ID fencing is to avoid a "ping/pong" situation where control of a broker ID passes back and forth between brokers.  We do want to allow a broker to rapidly reclaim its ID after being stopped with kill -9 or a similar method.

The Controller Quorum

As described in KIP-500, the controller will store its data in the internal __metadata topic.  This topic will be managed by Raft on the controller nodes.  The leader of the controller quorum will be the active controller.  The followers will function as hot standbys, ready to take over when the active leader fails or resigns.  The metadata will be stored in memory on all the active controllers.

Metadata

The __metadata topic will have a single partition which contains all the metadata in the cluster.  This topic is managed by Raft, and has periodic KIP-630 snapshots.

The format of the data in the snapshots is different from the format in the log entries. The log entries are essentially deltas that must be applied to the preceding snapshot.  We call the log entries "edits."  Using edits makes the metadata much smaller than it would otherwise be.  For example, we can represent a change in the ISR of a partition without restating all the other details about the partitionIt's important to note that in many cases, it will be better to add a tagged field than to bump the version number of an edit type.  A tagged field is appropriate any time older brokers can simply ignore a field that newer brokers might want to see.

Snapshots

Clearly, as we keep creating metadata edits, the metadata partition will grow without bound, even if the size of the actual metadata stays the same.  To avoid this, we will periodically write out our metadata state into a snapshot.

The implementation of Raft snapshots is described at the protocol level in KIP-XYZ630.  For the purpose of this KIP, the important thing is to understand the format of what the controller writes out, as well as how it is generated.

...

The controller will also snapshot less frequently when too many members of the quorum have fallen behind.  Specifically, if losing a node would probably impact availability, we will use a separate set of configurations for determining when to snapshot.

Public Interfaces

Configuration

Configuration NamePossible ValuesDescription
process.roles

null

broker

controller

broker,controller

If this is null (absent) then we are in legacy mode.

Otherwise, we are in KIP-500 mode and this configuration determines what roles this process should play: broker, controller, or both.

controller.listeners

null

a listener name

A comma-separated list of the names of the listeners used by the KIP-500 controller. This is required if this process is a KIP-500 controller. The legacy controller will not use this configuration

Despite the similar name, note that this is different from the "control plane listener" introduced by KIP-291.  The "control plane listener" is used on brokers, not on controllers.

controller.connect

null

a comma-separated list of controller URIs.

A comma-separated list of controller URIs that KIP-500 brokers should connect to on startup.  Required for brokers running in KIP-500 mode.
controller.id

a 32-bit ID

The controller id for this server. This must be set to a non-negative number when running as a KIP-500 controller. Controller IDs should not overlap with broker IDs.

RPCs

ControllerHeartbeat

Brokers periodically send heartbeats to the active controller.

These heartbeats serve to register the broker with the cluster.  They allow the broker to publish its listeners as well.

Code Block
languagejs
{
  "apiKey": 50,
  "type": "request",
  "name": "ControllerHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "TargetState", "type": "int8", "versions": "0+",
      "about": "The state that the broker wants to get into." },
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker epoch, or -1 if one has not yet been assigned." },
    { "name": "CurTimeNs", "type": "int64", "versions": "0+",
      "about": "The current monotonic time in nanoseconds." },
    { "name": "Listeners", "type": "[]Listener",
      "about": "The listeners of this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname." },
        { "name": "Port", "type": "int16", "versions": "0+",
          "about": "The port." },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol." }
      ]
    }
  ]
}

{
  "apiKey": 50,
  "type": "response",
  "name": "ControllerHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "NextState", "type": "int8", "versions": "0+",
      "about": "The state which the broker should transition to." },
    { "name": "LeaseEndNs", "type": "int64", "versions": "0+",
      "about": "The monotonic time in nanoseconds at which the lease should end." }
  ]
}

enum BrokerState {
    UNKNOWN(0),
    FENCED(1),
    ACTIVE(2);
}

The controller will return NOT_CONTROLLER if it is not active.

The initial heartbeat is treated specially... .(restart issue)

(controller heartbeat)

(detailed fencing descrip

In the snapshots, the data is stored in a straightforward way as a set of records containing key / value pairs.

Snapshot Format

Each change that the controller makes will generate a metadata edit log entry, also known as an"edit."  These edits will be persisted to the metadata partition.

The format of each edit will be:

  1. a varint specifying the edit type
  2. a varint specifying the edit version
  3. the payload in Kafka RPC format

If the controller tries to read an entry whose type or version is unknown, it will fail and abort the process.  The types and versions which the controller uses to write will be controlled by the KIP-584 feature flags which we have enabled.

It's important to note that in many cases, it will be better to add a tagged field than to bump the version number of an edit type.  A tagged field is appropriate any time older brokers can simply ignore a field that newer brokers might want to see.

Compatibility, Deprecation, and Migration Plan

...