You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state:

Discussion thread

JIRA:

Released: 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Apache Kafka is in the process of moving from storing metadata in Apache Zookeeper, to storing metadata in an internal Raft topic.  KIP-500 described the overall architecture and plan.  The purpose of this KIP is to go into detail about the Kafka Controller will change during this transition.

Proposed Changes

KIP-500 Mode

Once this KIP is implemented, system administrators will have the option of running in KIP-500 mode.  In this mode, we will not use ZooKeeper.  The alternative mode where KIP-500 support is not enabled will be referred to as legacy mode.

KIP-500 mode must be enabled for the entire cluster, not just for specific nodes.  Initially, this mode will be considered experimental and not ready for production.  As we do more testing and gain more confidence, we will remove the experimental label.  Eventually, in a future release, KIP-500 mode will be the only supported mode.  Since dropping support for legacy mode is an incompatible change, it will need to happen in a major release, of course.

Initially, we will not support upgrading a cluster from legacy mode to KIP-500 mode.  This is in keeping with the experimental nature of KIP-500 mode.  A future KIP will describe and implement an upgrade process from legacy mode to KIP-500 mode.

Deployment

Currently, a ZooKeeper cluster must be deployed when running Kafka.  This KIP will eliminate that requirement, as well as the requirement to configure the addresses of the zookeeper nodes on each broker.

Currently, any broker node can be elected as the controller.  As part of this KIP, controller nodes will become special nodes identified by a separate configuration.  Typically three or five nodes in the cluster will be selected to be controllers.  The addresses and ports of the controllers must be configured on each broker, so that the broker can contact the controller quorum when starting up.  This is similar to how we configure the ZooKeeper quorum on each node today.

Note that as long as at least one of the provided controller addresses is valid, the broker will be able to learn about the current metadata quorum and start up.  Once the broker is in contact with the metadata quorum, the quorum bootstrap addresses will not be needed.  This makes it possible to reconfigure the metadata quorum over time.  For example, if we start with a metadata quorum of host1, host2, host3, we could replace host3 with host4 without disrupting any of the brokers.  Then we could roll the brokers to apply the new metadata quorum bootstrap configuration of host1, host2, host4 on each one.

System administrators will be able to choose whether to run separate controller nodes, or whether to run controller nodes which are co-located with broker nodes.  Kafka will provide support for running a controller in the same JVM as a broker, in order to save memory and enable single-process test deployments.

Node IDs

Just like brokers, controller nodes will have non-negative integer node IDs.  There will be a single ID space.  In other words, no controller should share the same ID as a broker.  Even when a broker and a controller are co-located in the same JVM, they must have different node IDs.

Automatic node ID assignment via ZooKeeper will no longer be supported in KIP-500 mode.  Node IDs must be set in the configuration file for brokers and controllers.

Networking

Controller processes will listen on a separate port from brokers.  This will be true even when the broker and controller are co-located in the same JVM.

In a well-run Kafka deployment, controller ports, like ZooKeeper ports, should be firewalled off from clients.  This will prevent clients from disrupting the cluster by flooding the controller ports with requests.  In the realm of ACLs, this translates to controllers requiring CLUSTERACTION on CLUSTER for all operations.

The only time when clients should contact a controller node directly is when they are debugging system issues.  This is similar to ZooKeeper, where we have things like zk-shell, but only for debugging.

Controller Heartbeats

Each broker will periodically send heartbeat messages to the controller.  These heartbeat messages will serve to identify the broker as still in the cluster.  The controller can also use these heartbeats to communicate back information to each node.

These messages are separate from the fetch requests which the brokers send to retrieve new metadata.  However, controller heartbeats do include the offset of the latest metadata update which the broker has applied.

Broker Fencing

A broker is considered to be "fenced" if it will not process any client requests.  There are a few reasons why we might want the broker to be in this state.  The first is that the broker is truly isolated from the controller quorum, and therefore not up-to-date on its metadata.  The second is if the broker is attempting to use the ID of another broker which has already been registered.

Brokers start up in the fenced state and can leave this state only by sending a heartbeat to the active controller and getting back a response that tells them they can become active.  That response serves as a kind of lease which will expire after a certain amount of time.

Metadata Edits

Each change that the controller makes will generate a metadata edit log entry, also known as an"edit."  These edits will be persisted to the metadata partition.

The format of each edit will be:

  1. a varint specifying the edit type
  2. a varint specifying the edit version
  3. the payload in Kafka RPC format

If the controller tries to read an entry whose type or version is unknown, it will fail and abort the process.  The types and versions which the controller uses to write will be controlled by the KIP-584 feature flags which we have enabled.

It's important to note that in many cases, it will be better to add a tagged field than to bump the version number of an edit type.  A tagged field is appropriate any time older brokers can simply ignore a field that newer brokers might want to see.

Snapshots

Clearly, as we keep creating metadata edits, the metadata partition will grow without bound, even if the size of the actual metadata stays the same.  To avoid this, we will periodically write out our metadata state into a snapshot.

The implementation of Raft snapshots is described at the protocol level in KIP-XYZ.  For the purpose of this KIP, the important thing is to understand the format of what the controller writes out, as well as how it is generated.

Snapshots are local to each replica.  For example, replica A may have a snapshot at offset 100, and edits up to offset 150, whereas replica B may have a snapshot at 125 and edits up to offset 150.  Any snapshot must be usable as a starting point for loading the entire state of metadata.  In other words, a new controller node must be able to load the a snapshot, and then apply all the edits which follow it, and come up-to-date.

The currently active controller will monitor the offset of the latest snapshot made by all replicas, including itself.  The snapshotting state of each node is considered soft state: it is not persisted anywhere in the log, but purely communicated by heartbeats and stored in memory by the active controller.

When the controller feels that a remote node should start a snapshot, it will communicate that information in its response to the periodic heartbeat sent by that node.  When the controller feels like it itself should create a snapshot, it will first try to give up the leadership of the Raft quorum.  The active controller will not tell a node to begin snapshotting if it is aware that another node is also snapshotting.

The controller will also snapshot less frequently when too many members of the quorum have fallen behind.  Specifically, if losing a node would probably impact availability, we will use a separate set of configurations for determining when to snapshot.


Public Interfaces

Compatibility, Deprecation, and Migration Plan

Test Plan

Rejected Alternatives

  • No labels