Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Merge broker.id and controller.id into node.id

...

Currently, any broker node can be elected as the controller.  As part of this KIP, the active controller will instead be selected among a small potentially smaller pool of nodes specifically configured to act as controllers.  Typically three or five nodes in the cluster will be selected to be controllers.

...

Note that as long as at least one of the provided controller addresses is valid, the broker will be able to learn about the current metadata quorum and start up.  Once the broker is in contact with the metadata quorum, the quorum bootstrap addresses will not be needed.  This makes it possible to reconfigure the metadata quorum over time.  For example, if we start with a metadata quorum of host1, host2, host3, we could replace host3 with host4 without disrupting any of the brokers.  Then we could roll the brokers to apply the new metadata quorum bootstrap configuration of host1, host2, host4 on each one.

Node IDs

Just like brokers, controller nodes will have non-negative integer node IDs.  There will be a single ID space.  In other words, no controller should share the same ID as a broker.  Even when a broker and a controller are co-located in the same JVM, they must have different node IDs.

We define a node here as a tuple consisting of a node ID and a process role. Roles are defined by the `process.roles` configuration. As explained above, in a co-located configuration, a single process may take both the "controller" and "broker" roles. The node ID for both of these roles will be defined by the `node.id` configuration. However, this is mainly for configuration convenience. Semantically, we view the co-located process as representing two distinct nodes. Each node has its own listeners and its own set of APIs which it exposes. The APIs exposed by a controller node will not be the same as those exposed by a broker node.

Automatic node ID assignment via ZooKeeper will no longer be supported in KIP-500 mode.  Node IDs must be set in the configuration fileAutomatic node ID assignment via ZooKeeper will no longer be supported in KIP-500 mode.  Node IDs must be set in the configuration file for brokers and controllers.

Networking

Controller processes will listen on a separate endpoint from brokers.  This will be true even when the broker and controller are co-located in the same JVM.

...

Version 1 of meta.properties adds replaces the controllerbroker.id field .  Nodes which are acting as controllers will have this field.  If the node is also a broker, it will have the broker.id field as well.

During broker startup, if each directory does not have a broker.id, one will be added with the current broker id.  Similarly, during controller startup, if each directory does not have a controller id, one will be added with the current controller id.

with node.id.

For servers running in kip-500 mode, the `meta.properties` file must be present in every log directory. The process will raise an error during startup if if either the meta.properties file does not exist or if the node.id found does not match what the value from the configuration fileIf the broker id and/or controller id is present but does not match the configured id, we will throw an exception and abort the startup process.

Here is an example of a version 1 meta.properties file:

Code Block
$ cat /tmp/kafka-logs/meta.properties
#
#Tue Dec 01 10:08:08 PST 2020
cluster.id=3Db5QLSqSZieL3rJBUUegA
version=1
brokernode.id=0

kafka-storage.sh

There will be a new command-line tool, kafka-storage.sh.

...

Configuration NamePossible ValuesNotes
process.roles

null

broker

controller

broker,controller

If this is null (absent) then we are in legacy mode.

Otherwise, we are in KIP-500 mode and this configuration determines what roles this process should play: broker, controller, or both.

controller.listener.names

If non-null, this must be a comma-separated list of listener names.

When communicating with the controller quorum, the broker will always use the first listener in this list.

A comma-separated list of the names of the listeners used by the KIP-500 controller. This configuration is required if this process is a KIP-500 controller. The legacy controller will not use this configuration

Despite the similar name, note that this is different from the "control plane listener" introduced by KIP-291.

listeners

A comma-separated list of the configured listeners.  For example,

INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094

This configuration is now required.
sasl.mechanism.controller.protocolSASL mechanism used for communication with controllers. Default is GSSAPI.This is analogous to sasl.mechanism.inter.broker.protocol, but for communication with the controllers.
controller.quorum.voters

If non-null, this must be a comma-separated list of all the controller voters, in the format:

{controller-id}@{controller-host):{controller-port}

When in KIP-500 mode, each node must have this configuration, in order to find out how to communicate with the controller quorum.

Note that this replaces the "quorum.voters" config described in KIP-595.

This configuration is required for both brokers and controllers.

controllernode.ida 32-bit ID

The controller id for this server.  Only required if this server is a controller.

This configuration replaces `broker.id` for zk-based Kafka processes in order to reflect its more general usage. It serves as the ID associated with each role that the process is acting as.

For example, a configuration with `node.id=0` and `process.roles=broker,controller` defines two nodes: `broker-0` and `controller-0`

broker.ida 32-bit IDThe broker id for this server.  Only required if this server is a broker

.

initial.broker.registration.timeout.ms60000When initially registering with the controller quorum, the number of milliseconds to wait before declaring failure and exiting the broker process.
broker.heartbeat.interval.ms3000The length of time between broker heartbeats.
broker.session.timeout.ms18000The length of time that a broker lease lasts if no heartbeats are made.
metadata.log.dirIf set, this must be a path to a log directory.This configuration determines where we put the metadata log.  if it is not set, the metadata log is placed in the first log directory from log.dirs.
controller.quorum.fetch.timeout.msMaximum time without a successful fetch from the current leader before a new election is started.New name for quorum.fetch.timeout.ms

controller.quorum.election.timeout.ms

Maximum time without collected a majority of votes during the candidate state before a new election is retriedNew name for quorum.election.timeout.ms
controller.quorum.election.backoff.max.msMaximum exponential backoff time (based on the number if retries) after an election timeout, before a new election is triggered.New name for quorum.election.backoff.max.ms
controller.quorum.request.timeout.msMaximum time before a pending request is considered failed and the connection is droppedNew name for quorum.request.timeout.ms
controller.quorum.retry.backoff.msInitial delay between request retries. This config and the one below is used for retriable request errors or lost connectivity and are different from the election.backoff configs aboveNew name for quorum.retry.backoff.ms
controller.quorum.retry.backoff.max.msMax delay between requests. Backoff will increase exponentially beginning from quorum.retry.backoff.msNew name for quorum.retry.backoff.max.ms

...

The main reason for making them separate requests is to have better separation of concerns.  Fetching metadata is logically a bit different than sending a heartbeat, and coupling them could result in a messy design and code.  We would have to add significant extra complexity to the FetchRequest schema.  Perhaps even worse, we would need to make the timing of fetch requests line up with the timing needed for broker heartbeats.

Shared IDs between Multiple Nodes

One possibility that we've discussed is whether we could have controller IDs and broker IDs share the same ID space.  For example, could there be both a broker 1 and a controller 1?  This is not a good idea because NetworkClient assumes a single ID space.  So if there is both a controller 1 and a broker 1, we don't have a way of picking the "right" one.  We would have to add a concept of node types.  That is a fair amount of extra work.  It also is a bit conceptually messy for the network layer to understand node types, rather than just treating them all as endpoints.

Another reason not to share node IDs is that it would make migrating from a combined broker+controller node to separate nodes more difficult.  For example, let's say you have three combined nodes but the controller load is getting too high and you now want a separate controller node.  If you also have to change the controller ID, the migration becomes more difficult.  Since controller node IDs make their way into the Raft log itself, changing the ID later on is non-trivial.

External metrics and management systems often make use of the concept of node ID.  For example, someone may want to aggregate all the socket server metrics from node 1.  But if there are actually two socket server sharing this same node ID (controller and broker) this may create problems and require changes to the external system.

A related question is whether a broker and a controller could share the same port, if they are co-located in the same JVM.  We discussed this as part of KIP-590 and agreed that we would not share ports.  A shared port would make it impossible to have separate RPC handlers, complicating the code.  It would also make it impossible to have separate, stricter authentication for controllers.  Sharing a port with the broker opens us up to denial of service attacks, including unintentional ones.