You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Author: Gokul Ramanan Subramanian

Status

Current stateUnder Discussion

Discussion thread: here

JIRA: KAFKA-9590

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, a Kafka cluster does not limit the number of topic partitions despite the fact that on a given broker, having more than a certain number of topic partitions (without even any messages written / read) renders the broker unhealthy. (TODO: Give some examples of what happens) Therefore, we want to add a configuration in Kafka to limit the number of topic partitions on any broker. Different brokers in a cluster should be configurable with different values for this configuration, in order to support heterogenous clusters where some brokers are more resourceful than others.

While this KIP focuses only on supporting a way to limit the number of topic partitions per broker, we aim to use a general approach that is suitable for applying broker-level guard rails on a Kafka cluster. These guard rails can complement a good monitoring solution by acting as defensive checks that prevent brokers from getting into an unhealthy state.

Public Interfaces

Configs

Config nameTypeDefaultUpdate-mode
max.broker.partitionsint32int32's max valueread-only

API Exceptions

(TODO: List exceptions that APIs will throw.)

ZooKeeper

The ZooKeeper znode for each broker will have an additional optional guard_rails.max_partitions field.

$ zookeeper-shell.sh localhost get /brokers/ids/1
{
  "version":4,
  "host":"localhost",
  "port":9092,
  "jmx_port":9999,
  "timestamp":"2233345666",
  "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
  "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"},
  "rack":"dc1",
  "guard_rails": {
    "max_partitions": 200
  }
}

Proposed Changes

We will introduce a new read-only int32 configuration called max.broker.partitions that Kafka administrators can specify in the server.properties file. This will have a default value equal to the maximum int32 value that Java can represent (231 - 1), which should be sufficient for the foreseeable future. In other words, we don't need int64. Since the motivation for setting a limit on the number of topic partitions is to control the usage of system resources, and because these resources do not typically change while the system is running, it is sufficient to have a read-only configuration, that requires a broker restart in case of change.

Semantics of max.broker.partitions

With this configuration, a topic creation request will fail if it is not possible to choose replicas for all the partitions of the topic such that the resulting number of topic partitions in each broker does not exceed the limit. For example, in a 3 broker cluster, say that each broker was configured with max.broker.partitions=10. If the brokers already host 8, 6 and 9 partitions respectively, then a request to create a new topic with 1 partition and a replication factor 3 can be satisfied, resulting in partition counts of 9, 7 and 10 respectively. However, the next topic creation request for 1 partition with a replication factor of 3 will fail because broker 3 has already reached the limit. A similar request with a replication factor of 2 can however, succeed, because 2 of the brokers have still not reached the limit. If the original request for a topic with 1 partition was actually a request for a topic with 2 partitions, with a replication factor of 3, then the request would have failed in entirety.

The above behavior will also hold true for adding partitions to an existing topic. Further, a partition reassignment request will fail if the requested reassignment would exceed the max.broker.partitions limit on some broker.

Communicating max.broker.partitions value from brokers to controller

Since the max.broker.partitions configuration is applied at each broker, we need a way to pass this information to the controller broker which is responsible for assigning partitions to brokers. We propose using ZooKeeper for this. Each broker will store its value of max.broker.partitions (if present) into the corresponding /brokers/ids/{brokerId} ZooKeeper znode as broker metadata.

With this change, an example broker znode entry will look as follows. Instead of adding the max_partitions key at the top-level in the JSON object, we add it nested within a guard_rails key in order to support other guard rails in the future.

{
  "version":4,
  "host":"localhost",
  "port":9092,
  "jmx_port":9999,
  "timestamp":"2233345666",
  "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
  "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"},
  "rack":"dc1",
  "guard_rails": {
    "max_partitions": 200
  }
}

Communicating current number of partitions in broker to controller

(TODO: Write this up)

Algorithm for assigning partition replicas to brokers

When creating a topic or adding partitions to a given topic, the controller currently assigns partitions replicas to brokers such that the replicas are on as many different racks as possible (see KIP-36). We will modify this algorithm to ensure that brokers with no capacity do not get assigned partition replicas. Here, we define the capacity of a broker as

Capacity definition (pseudocode)
capacity = max(0, max number of partitions allowed - current number of partitions)

Compared to the current algorithm, all we need to ensure is that if a broker has run out of capacity, then it is not a candidate for being assigned partitions. (TODO: Explain in more detail.)

Compatibility, Deprecation, and Migration Plan

This change is backwards-compatible in practice because we will set the default value for max.broker.partitions equal to the maximum int32 value that Java can represent, which is quite large (231 - 1). Users will anyway run into system issues far before the number of topic partitions on a broker reaches this value.

In order to ease migration, a broker that already has more than max.broker.partitions number of partitions at the time at which max.broker.partitions configuration is set for that broker, will continue to function just fine for the existing partitions although it will be unable to host any further partitions. The Kafka administrator can later reassign partitions from this broker to another in order to get the broker to respect the max.broker.partitions limit, although this is not necessary.

Rejected Alternatives

  • Making max.broker.partitions a dynamic configuration as opposed to a read-only configuration. This approach does offer extra flexibility, but adds complexity without adding much value. Dynamic reconfigurability of max.broker.partitions would be useful if there is a good use case for changing it while the broker is running. However, the only real use case for setting this configuration is to limit usage of system resources, which typically don't change when the broker is running.
  • Creating a configuration to limit the global number of topic partitions across the cluster. This is not strictly a rejected alternative, but an orthogonal idea that we do not pursue in this KIP.
  • No labels