You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Author: Gokul Ramanan Subramanian

Status

Current stateDraft

Discussion thread: here

JIRA: KAFKA-9590

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, a Kafka cluster does not limit the number of partitions despite the fact that on a given broker, having more than a certain number of partitions (without even any messages written / read) renders the broker unhealthy. (TODO: Give some examples of what happens) Therefore, we want to add a configuration in Kafka to limit the number of partitions on any broker. Different brokers in a cluster should be configurable with different values for this configuration, in order to support heterogenous clusters where some brokers are more resourceful than others.

While this KIP focuses only on supporting a way to limit the number of partitions per broker, we aim to use a general approach that is suitable for applying broker-level guard rails on a Kafka cluster. These guard rails can complement a good monitoring solution by acting as defensive checks that prevent brokers from getting into an unhealthy state.

Glossary note: In this KIP, we will refer to the partition capacity of a broker as the number of partitions it can host beyond what it already hosts. More specifically,

Capacity definition (pseudocode)
partition capacity of broker = max(0, max.broker.partitions value for broker - current number of partitions hosted by broker)

Public Interfaces

Configs

Config nameTypeDefaultUpdate-mode
max.broker.partitionsint32int32's max valueread-only

API Exceptions

CreateTopics, CreatePartitions and AlterPartitionReassignments APIs will throw the following exception if it is not possible to satisfy the request while respecting the max.broker.partitions limit.

public class GuardRailsException extends ApiException { ... }

This exception message will indicate that brokers don't have run out of their partition capacity. In order to be actionable, the exception message will contain a map of brokers with their respective partition capacities.

Corresponding to this exception, we will have the following generic API error code.

GUARD_RAILS(88, "Not possible to satisfy request due to guard rails", GuardRailsException::new); 

ZooKeeper

The ZooKeeper znode for each broker will have an additional optional guard_rails.max_partitions field. To support this change in a backwards compatible manner, we will bump up the version of the ZooKeeper broker znode entry from 4 to 5.

$ zookeeper-shell.sh localhost get /brokers/ids/1
{
  "version":5,
  "host":"localhost",
  "port":9092,
  "jmx_port":9999,
  "timestamp":"2233345666",
  "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
  "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"},
  "rack":"dc1",
  "guard_rails": {
    "max_partitions": 200
  }
}

Proposed Changes

We will introduce a new read-only int32 configuration called max.broker.partitions that Kafka administrators can specify in the server.properties file. This will have a default value equal to the maximum int32 value that Java can represent (231 - 1), which should be sufficient for the foreseeable future. In other words, we don't need int64. Since the motivation for setting a limit on the number of partitions is to control the usage of system resources, and because these resources do not typically change while the system is running, it is sufficient to have a read-only configuration, that requires a broker restart in case of change.

An illustrative example

With this configuration, a topic creation request (via the CreateTopics API) will fail if it is not possible to choose replicas for all the partitions of the topic without running out of partition capacity on at least one broker. For example, in a 3 broker cluster, say that each broker was configured with max.broker.partitions=10. If the brokers already host 8, 6 and 9 partitions respectively, then a request to create a new topic with 1 partition and a replication factor 3 can be satisfied, resulting in partition counts of 9, 7 and 10 respectively. However, the next topic creation request for 1 partition with a replication factor of 3 will fail because broker 3 has already reached the limit. A similar request with a replication factor of 2 can however, succeed, because 2 of the brokers have still not reached the limit. If the original request for a topic with 1 partition was actually a request for a topic with 2 partitions, with a replication factor of 3, then the request would have failed in entirety.

The above behavior will also hold true for adding partitions to an existing topic (via the CreatePartitions API). Further, a partition reassignment request (via the AlterPartitionReassignments) will fail if the requested reassignment would exceed the max.broker.partitions limit on some broker.

Algorithm for assigning partition replicas to brokers

When creating a topic or adding partitions to a given topic, the controller currently assigns partitions to brokers such that a partition's replicas are on as many different racks as possible (see KIP-36). This algorithm is encapsulated in the method `AdminUtils.assignReplicasToBrokers`. We will modify this algorithm to ensure that brokers with no partition capacity do not get assigned any partitions.

The following sections describe how the controller gets a handle on the max.broker.partitions value and the current number of partitions for all brokers.

Communicating max.broker.partitions value from brokers to controller

Since the max.broker.partitions configuration is applied at each broker, we need a way to pass this information to the controller broker which is responsible for assigning partitions to brokers. We propose using ZooKeeper for this. Each broker will store its value of max.broker.partitions (if present) into the corresponding /brokers/ids/{brokerId} ZooKeeper znode as broker metadata.

With this change, an example broker znode entry will look as follows. Instead of adding the max_partitions key at the top-level in the JSON object, we add it nested within a guard_rails key in order to support other guard rails in the future.

{
  "version":5,
  "host":"localhost",
  "port":9092,
  "jmx_port":9999,
  "timestamp":"2233345666",
  "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
  "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"},
  "rack":"dc1",
  "guard_rails": {
    "max_partitions": 200
  }
}

To support this change in a backwards compatible manner, we will bump up the version of the ZooKeeper broker znode entry from 4 to 5.

Communicating current number of partitions in broker to controller

Each broker (including the controller broker) keeps a cache of partition state (in `MetadataCache.UpdateMetadataPartitionState`). The `AdminManager` class has access to the `MetadataCache` and will use this to construct a reverse map of `active broker ID → number of partitions`.

Similarly, the `KafkaController` class (which is relevant only on the controller broker) keeps a reference to `ControllerContext`, whose `replicasOnBrokers` method we will use to obtain the number of partitions in each broker.

Compatibility, Deprecation, and Migration Plan

This change is backwards-compatible in practice because we will set the default value for max.broker.partitions equal to the maximum int32 value that Java can represent, which is quite large (231 - 1). Users will anyway run into system issues far before the number of partitions on a broker reaches this value.

In order to ease migration, a broker that already has more than max.broker.partitions number of partitions at the time at which max.broker.partitions configuration is set for that broker, will continue to function just fine for the existing partitions although it will be unable to host any further partitions. The Kafka administrator can later reassign partitions from this broker to another in order to get the broker to respect the max.broker.partitions limit.

Rejected Alternatives

  • Making max.broker.partitions a dynamic configuration as opposed to a read-only configuration. This approach does offer extra flexibility, but adds complexity without adding much value. Dynamic reconfigurability of max.broker.partitions would be useful if there is a good use case for changing it while the broker is running. However, the only real use case for setting this configuration is to limit usage of system resources, which typically don't change when the broker is running.
  • Creating a configuration to limit the global number of partitions across the cluster. This is not strictly a rejected alternative, but an orthogonal idea that we do not pursue in this KIP.
  • Throwing a much more specific exception such as NoTopicPartitionCapacityException. This is possible, but we believe that having a class of exceptions for guard rails is more flexible to other types of guard rails in the future.
  • No labels