Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The only time when clients should contact a controller node directly is when they are debugging system issues.  This is similar to ZooKeeper, where we have things like zk-shell, but only for debugging.

The Controller

...

Each broker will periodically send heartbeat messages to the controller.  These heartbeat messages will serve to identify the broker as still in the cluster.  The controller can also use these heartbeats to communicate back information to each node.

These messages are separate from the fetch requests which the brokers send to retrieve new metadata.  However, controller heartbeats do include the offset of the latest metadata update which the broker has applied.

Broker Fencing

Brokers start up in the fenced state and can leave this state only by sending a heartbeat to the active controller and getting back a response that tells them they can become active.  That response serves as a kind of lease which will expire after a certain amount of time.

When a broker is fenced, it cannot process any client requests.  This prevents brokers which are not receiving metadata updates or that are not receiving and processing them fast enough from causing issues to clients.

Another reason why a controller might ask a broker to fence itself is to prevent the broker from claiming a broker ID that is already in use by a different broker.  In general, our goal with broker ID fencing is to avoid a "ping/pong" situation where control of a broker ID passes back and forth between brokers.  We do want to allow a broker to rapidly reclaim its ID after being stopped with kill -9 or a similar method.

The Controller Quorum

As described in KIP-500, the controller will store its data in the internal __metadata topic.  This topic will be managed by Raft on the controller nodes.  The leader of the controller quorum will be the active controller.  The followers will function as hot standbys, ready to take over when the active leader fails or resigns.  The metadata will be stored in memory on all the active controllers.

Metadata

The __metadata topic will have a single partition which contains all the metadata in the cluster.  This topic is managed by Raft, and has periodic KIP-630 snapshots.

The format of the data in the snapshots is different from the format in the log entries. The log entries are essentially deltas that must be applied to the preceding snapshot.  We call the log entries "edits."  Using edits makes the metadata much smaller than it would otherwise be.  For example, we can represent a change in the ISR of a partition without restating all the other details about the partition.

Snapshots

Clearly, as we keep creating metadata edits, the metadata partition will grow without bound, even if the size of the actual metadata stays the same.  To avoid this, we will periodically write out our metadata state into a snapshot.

The implementation of Raft snapshots is described at the protocol level in KIP-630.  For the purpose of this KIP, the important thing is to understand the format of what the controller writes out, as well as how it is generated.

Snapshots are local to each replica.  For example, replica A may have a snapshot at offset 100, and edits up to offset 150, whereas replica B may have a snapshot at 125 and edits up to offset 150.  Any snapshot must be usable as a starting point for loading the entire state of metadata.  In other words, a new controller node must be able to load the a snapshot, and then apply all the edits which follow it, and come up-to-date.

The currently active controller will monitor the offset of the latest snapshot made by all replicas, including itself.  The snapshotting state of each node is considered soft state: it is not persisted anywhere in the log, but purely communicated by heartbeats and stored in memory by the active controller.

When the controller feels that a remote node should start a snapshot, it will communicate that information in its response to the periodic heartbeat sent by that node.  When the controller feels like it itself should create a snapshot, it will first try to give up the leadership of the Raft quorum.  The active controller will not tell a node to begin snapshotting if it is aware that another node is also snapshotting.

The controller will also snapshot less frequently when too many members of the quorum have fallen behind.  Specifically, if losing a node would probably impact availability, we will use a separate set of configurations for determining when to snapshot.

Public Interfaces

Configuration

...

null

broker

controller

broker,controller

...

If this is null (absent) then we are in legacy mode.

Otherwise, we are in KIP-500 mode and this configuration determines what roles this process should play: broker, controller, or both.

...

null

a listener name

...

A comma-separated list of the names of the listeners used by the KIP-500 controller. This is required if this process is a KIP-500 controller. The legacy controller will not use this configuration

Despite the similar name, note that this is different from the "control plane listener" introduced by KIP-291.  The "control plane listener" is used on brokers, not on controllers.

...

null

a comma-separated list of controller URIs.

...

a 32-bit ID

...

Quorum

As described in KIP-500, the controller will store its data in the internal __metadata topic.  This topic will be managed by Raft on the controller nodes.  The leader of the controller quorum will be the active controller.  The followers will function as hot standbys, ready to take over when the active leader fails or resigns.  The metadata will be stored in memory on all the active controllers.

Metadata

The __metadata topic will have a single partition which contains all the metadata in the cluster.  This topic is managed by Raft, and has periodic KIP-630 snapshots.

The format of the data in the snapshots is different from the format in the log entries. The log entries are essentially deltas that must be applied to the preceding snapshot.  We call the log entries "edits."  Using edits makes the metadata much smaller than it would otherwise be.  For example, we can represent a change in the ISR of a partition without restating all the other details about the partition.

Snapshots

Clearly, as we keep creating metadata edits, the metadata partition will grow without bound, even if the size of the actual metadata stays the same.  To avoid this, we will periodically write out our metadata state into a snapshot.

The implementation of Raft snapshots is described at the protocol level in KIP-630.  For the purpose of this KIP, the important thing is to understand the format of what the controller writes out, as well as how it is generated.

Snapshots are local to each replica.  For example, replica A may have a snapshot at offset 100, and edits up to offset 150, whereas replica B may have a snapshot at 125 and edits up to offset 150.  Any snapshot must be usable as a starting point for loading the entire state of metadata.  In other words, a new controller node must be able to load the a snapshot, and then apply all the edits which follow it, and come up-to-date.

The currently active controller will monitor the offset of the latest snapshot made by all replicas, including itself.  The snapshotting state of each node is considered soft state: it is not persisted anywhere in the log, but purely communicated by heartbeats and stored in memory by the active controller.

When the controller feels that a remote node should start a snapshot, it will communicate that information in its response to the periodic heartbeat sent by that node.  When the controller feels like it itself should create a snapshot, it will first try to give up the leadership of the Raft quorum.  The active controller will not tell a node to begin snapshotting if it is aware that another node is also snapshotting.

The controller will also snapshot less frequently when too many members of the quorum have fallen behind.  Specifically, if losing a node would probably impact availability, we will use a separate set of configurations for determining when to snapshot.

Controller Heartbeats

Every distributed system needs a way of managing cluster membership.  Prior to KIP-500, Kafka brokers registered ephemeral znodes in order to register themselves as part of the cluster.  The Kafka controller passively consumed the registration information from Zookeeper.

In the post-KIP-500 world there is no ZooKeeper and no ephemeral znodes.  Instead, each broker sends a ControllerHeartbeat request to the active controller every few seconds.

This heartbeat acts as a registration.  However, the controller has a choice about whether to accept it.  It will reject brokers whose metadata is too stale, or whose IDs have been claimed by another broker.  It will also reject brokers that do not support the minimum feature level of all KIP-584 features that are enabled.

When the broker accepts the registration, it grants or renews a broker ID lease associating the broker process with its ID.  Leases are time-bounded.  A broker cannot continue using a lease indefinitely after sending a single heartbeat.  When brokers are rejected by the controller, or otherwise unable to renew their lease before it expires, they enter the "fenced" state.

Broker Fencing

Brokers that don't have a broker ID lease are said to be "fenced."  When a broker is fenced, it cannot process any client requests.  This prevents brokers which are not receiving metadata updates or that are not receiving and processing them fast enough from causing issues to clients.

Brokers start up in the fenced state, and can leave this state only by sending a heartbeat to the active controller and getting back a response that tells them they can become active.

Controlled Shutdown

In the pre-KIP-500 world, brokers triggered a controller shutdown by making an RPC to the controller.  When the controller returned a successful result from this RPC, the broker knew that it could shut down.

In the post-KIP-500 world, controller shutdown is handled by the broker heartbeat system instead.  In its periodic heartbeats, the broker asks the controller if it can transition into the SHUTDOWN state.  This motivates the controller to move all of the leaders off of that broker.  Once they are all moved, the controller responds to the heartbeat with a nextState of SHUTDOWN.

Broker ID Conflicts

Clearly, in a correctly managed cluster, there should be no broker ID conflicts.  Each broker should be configured with a unique ID.  However, we want the system to be robust against misconfigurations.  Therefore, if there are two brokers that claim the same ID, the controller will choose only one and tell the other to fence itself.

When a broker first starts up, before it has received any responses from the controller, it will always "win" broker ID conflicts.  However, once it has communicated with the controller, it may lose subsequent conflicts if its broker epoch is stale.  The reason for favoring new processes is to accommodate the common case where a process is killed with kill -9 and then restarted.  We want it to be able to reclaim its old ID quickly in this case.

Broker State Machine

This state machine ties together a few of the previous sections, such as broker registration and shutdown.

Code Block
                        INITIAL
sent initial heartbeart,  | 
got back an epoch         |
                          V
                        ACTIVE -------------------------> SHUTDOWN
lease expired or revoked  |  ^                          |
(stale metadata,          |  | lease restored           | controller granted
 id conflict,             |  | with new broker epoch    | controlled shutdown
 unable to communicate,   V  |             

RPCs

ControllerHeartbeat

Brokers periodically send heartbeats to the active controller.

These heartbeats serve to register the broker with the cluster.  They allow the broker to publish its listeners as well.

Code Block
languagejs
{
  "apiKey": 50,
  "type": "request",
  "name": "ControllerHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "TargetState", "type": "int8", "versions": "0+",
      "about": "The state that the broker wants to get into." },
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker epoch, or -1 if one has not yet been assigned." },
    { "name": "CurTimeNs", "type": "int64", "versions": "0+",
      "about": "The current monotonic time in nanoseconds." },
    { "name": "Listeners", "type": "[]Listener",
      "about": "The listeners of this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname." },|
    etc.)    { "name": "Port", "type": "int16", "versions": "0+",
        FENCED  "about": "The port." },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol." }
      ]
    }
  ]
}

{
  "apiKey": 50,
  "type": "response",
  "name": "ControllerHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "NextState", "type": "int8", "versions": "0+",
      "about": "The state which the broker should transition to." },
    { "name": "LeaseEndNs", "type": "int64", "versions": "0+",
      "about": "The monotonic time in nanoseconds at which the lease should end." }
  ]
}

enum BrokerState {
    UNKNOWN(0),
    FENCED(1),
    ACTIVE(2);
}

The controller will return NOT_CONTROLLER if it is not active.

The initial heartbeat is treated specially... .(restart issue)

(controller heartbeat)

(detailed fencing descrip

In the snapshots, the data is stored in a straightforward way as a set of records containing key / value pairs.

Snapshot Format

Each change that the controller makes will generate a metadata edit log entry, also known as an"edit."  These edits will be persisted to the metadata partition.

The format of each edit will be:

  1. a varint specifying the edit type
  2. a varint specifying the edit version
  3. the payload in Kafka RPC format

If the controller tries to read an entry whose type or version is unknown, it will fail and abort the process.  The types and versions which the controller uses to write will be controlled by the KIP-584 feature flags which we have enabled.

It's important to note that in many cases, it will be better to add a tagged field than to bump the version number of an edit type.  A tagged field is appropriate any time older brokers can simply ignore a field that newer brokers might want to see.

Compatibility, Deprecation, and Migration Plan

Test Plan

...

-------------------------+ 

Public Interfaces

Configurations

Configuration NamePossible ValuesDescription
process.roles

null

broker

controller

broker,controller

If this is null (absent) then we are in legacy mode.

Otherwise, we are in KIP-500 mode and this configuration determines what roles this process should play: broker, controller, or both.

controller.listeners

null

a listener name

A comma-separated list of the names of the listeners used by the KIP-500 controller. This is required if this process is a KIP-500 controller. The legacy controller will not use this configuration

Despite the similar name, note that this is different from the "control plane listener" introduced by KIP-291.  The "control plane listener" is used on brokers, not on controllers.

controller.connect

null

a comma-separated list of controller URIs.

A comma-separated list of controller URIs that KIP-500 brokers should connect to on startup.  Required for brokers running in KIP-500 mode.
controller.id

a 32-bit ID

The controller id for this server. This must be set to a non-negative number when running as a KIP-500 controller. Controller IDs should not overlap with broker IDs.

RPCs

ControllerHeartbeat

Code Block
languagejs
{
  "apiKey": 50,
  "type": "request",
  "name": "ControllerHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "TargetState", "type": "int8", "versions": "0+",
      "about": "The state that the broker wants to reach." },
    { "name": "BrokerId", "type": "int32", "versions": "0+",
      "about": "The broker ID." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker epoch, or -1 if one has not yet been assigned." },
    { "name": "LeaseStartTimeMs", "type": "int64", "versions": "0+",
      "about": "The time which the broker wants the lease to start at in milliseconds." },
    { "name": "CurMetadataOffset", "type": "int64", "versions": "0+",
      "about": "The highest metadata offset which the broker has reached." },
    { "name": "Listeners", "type": "[]Listener",
      "about": "The listeners of this broker", "versions": "0+", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname." },
        { "name": "Port", "type": "int16", "versions": "0+",
          "about": "The port." },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol." }
      ]
    }
  ]
}

{
  "apiKey": 50,
  "type": "response",
  "name": "ControllerHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "NextState", "type": "int8", "versions": "0+",
      "about": "The state which the broker should transition to." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The broker's assigned epoch, or -1." },
    { "name": "LeaseEndTimeMs", "type": "int64", "versions": "0+",
      "about": "The time in milliseconds at which the lease should end. This is based on the start time that was passed, not the controller's local clock." }
  ]
}

enum BrokerState {
    UNKNOWN(0),
    INITIAL(1),
    FENCED(2),
    ACTIVE(3),
    SHUTDOWN(4);
}

The controller will return NOT_CONTROLLER if it is not active.  Brokers will always return NOT_CONTROLLER for these RPCs.

Snapshot Format

Delta Format

Metrics

Each change that the controller makes will generate a metadata edit log entry, also known as an"edit."  These edits will be persisted to the metadata partition.

The format of each edit will be:

  1. a varint specifying the edit type
  2. a varint specifying the edit version
  3. the payload in Kafka RPC format

If the controller tries to read an entry whose type or version is unknown, it will fail and abort the process.  The types and versions which the controller uses to write will be controlled by the KIP-584 feature flags which we have enabled.

It's important to note that in many cases, it will be better to add a tagged field than to bump the version number of an edit type.  A tagged field is appropriate any time older brokers can simply ignore a field that newer brokers might want to see.

Compatibility, Deprecation, and Migration Plan

Test Plan

Rejected Alternatives

Support Automatic Broker ID Assignment

This KIP proposes to drop support for automatic broker ID assignment.  What if we decided to continue to support it?

If we were willing to take a little bit more complexity on board, it would be relatively easy to support automatic broker ID assignment.  Brokers could simply ask the active controller to assign them a new ID when starting up, just as they previously obtained one from ZooKeeper.

However, automatic controller ID assignment is a much more difficult problem.  We never actually supported automatically assigning ZooKeeper IDs, so there is no pattern to follow here.  In general, Raft assumes that nodes know their IDs before the protocol begins. We cannot rely on random assignment because the 31 bit space is not large enough. We could perhaps create a separate protocol for assigning node IDs, but it might be complex.  

In general it's not clear how useful automatic broker ID assignment really is.  Configuration management software like Puppet, Chef, or Ansible can easily create a new ID for each node's configuration file.  Therefore, it's probably best to use this compatibility break to drop support for automatic broker ID assignment.

Combined Heartbeats and Fetch Requests

The brokers are always fetching new metadata from the controller.  Why not combine these fetch requests with the heartbeat requests, so that the brokers only have to send one request rather than two?

The main reason for making them separate requests is to have better separation of concerns.  Fetching metadata is logically a bit different than sending a heartbeat, and coupling them could result in a messy design and code.  We would have to add significant extra complexity to the FetchRequest schema.  Perhaps even worse, we would need to make the timing of fetch requests line up with the timing needed for controller heartbeats.

...