Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In order to mitigate these issues, we propose having a configuration named max.broker.partitions to limit the number of partitions per broker. Different brokers in a cluster should be configurable with different values for this configuration, in order to support heterogenous clusters where some brokers are more resourceful than others. To support dynamic environments

Given that the recommendation is to run homogenous Kafka clusters, where each broker in the cluster has the same specifications (CPU, RAM, disk etc.), we will not attempt to support max.broker.partitions as a broker-specific configuration, but rather only as a cluster-wide configuration.

In order to support dynamic environments (such as Docker) where the resources allocated to a broker the brokers may change while the broker is brokers are running, it should be possible to modify this configuration dynamically i.e. without restarting the brokers.

...

Code Block
languagejava
titleDefinition: Partition capacity
partition capacity of broker = max(0, max.broker.partitions value for broker - current number of partitions hosted by broker)

...

Config nameTypeDefaultUpdate-mode
max.broker.partitionsint32int32's max valuepercluster-brokerwide

Kafka administrators can specify the max.broker.partitions configuration in the server.properties file. 

They can also use the following to set/modify the max.broker.partitions configuration via ZooKeeper.

For example,

Code Block
languagebash
./kafka-config.sh --zookeeper $ZOOKEEPER --alter --add-config max.broker.partitions=1000 --entity-type brokers --entity-name 1default

It is also possible to set this value per broker via the following command for testing purposes.applies a limit of 1000 to broker 1 only, and

Code Block
languagebash
./kafka-config.sh --zookeeper $ZOOKEEPER --alter --add-config max.broker.partitions=1000 --entity-type brokers --entity-defaultname 1


However, if different values are specified for different brokers, then only the value that applies to the controller will matterapplies a limit of 1000 to each of the brokers.

API Exceptions

CreateTopics, CreatePartitions and AlterPartitionReassignments APIs will throw the following exception if it is not possible to satisfy the request while respecting the max.broker.partitions limit.

...

Code Block
languagejava
WILL_EXCEED_BROKER_PARTITION_CAPACITY(88, "Cannot satisfy request without exceeding the max.broker.partitions limit on brokers", WillExceedBrokerPartitionCapacityException::new); 

Proposed Changes

We will introduce a new percluster-broker wide int32 configuration called max.broker.partitions that Kafka administrators can specify in the server.properties file or modify via ZooKeeper. This will have a default value equal to the maximum int32 value that Java can represent (231 - 1), which should be sufficient for the foreseeable future. In other words, we don't need int64.

...

For example, in a 3 broker cluster, say that each broker was configured with max.broker.partitions =is configured equal to 10. If the brokers already host 8, 6 and 9 partitions respectively, then a request to create a new topic with 1 partition and a replication factor 3 can be satisfied, resulting in partition counts of 9, 7 and 10 respectively. However, the next topic creation request for 1 partition with a replication factor of 3 will fail because broker 3 has already reached the limit. A similar request with a replication factor of 2 can however, succeed, because 2 of the brokers have still not reached the limit. If the original request for a topic with 1 partition was actually a request for a topic with 2 partitions, with a replication factor of 3, then the request would have failed in entirety.

...

When creating a topic or adding partitions to a given topic, the controller currently assigns partitions to brokers such that a partition's replicas are on as many different racks as possible (see KIP-36). This algorithm is encapsulated in the method `AdminUtils.assignReplicasToBrokers`. We will modify this algorithm to ensure that brokers with no partition capacity do not get assigned any partitions.

The following sections describe how the controller gets a handle on `AdminManager` class has access to the `KafkaConfig` object from where the controller can extract the max.broker.partitions value and the current number of partitions for all brokers.

Communicating max.broker.partitions value from brokers to controller

TODO

Communicating current number of partitions in broker to controller

if present.

Each broker (including the controller broker) keeps a cache of partition state (in `MetadataCache.UpdateMetadataPartitionState`). The `AdminManager` class has access to the `MetadataCache` and will use this to construct a reverse map of `active broker ID → number of partitions`.

...

Add configuration to limit total number of partitions at in the cluster level

Having a limit on each broker effectively imposes a cluster-wide limit as well, thereby making a cluster-level limit not strictly necessary. The cluster-level configuration can make the interface complex, therefore we will avoid it in this KIP. However, we can revisit this in the future if there is a good use case for this.

Add configuration to limit number of partitions at the per topic level

Having a limit at the topic level does not help address the problem discussed in this KIP if there are lots of topics. While each topic may only have a limited number of partitions, it is possible that there will be many more partitions than on a broker than it can handle efficiently.

...

Further, in environments such as Docker where it is possible to allocate more resources to the broker on the fly, it would be restrictive to not be able to modify the max.broker.partitions configuration on the fly as well.

Make the max.broker.partitions per-broker

This is in general a more flexible approach than the one described in this KIP and allows having different brokers with different resources, each have its own max.broker.partitions configuration. However, this would require sending the broker-specific configuration to the controller, which can use this while creating topics and partitions or reassigning partitions. One approach would be to put this information into the broker's ZooKeeper znode and have the controller rely on that. And the other would be to create a new API request-response that brokers can use to share this information with the controller. Both of these approaches introduce complexity for little gain. We are not aware of any clusters that are running with heterogenous configurations where having different max.broker.partitions configuration for each broker would help. Therefore, in this KIP, we do not take this approach.

Appendix

A1: Producer performance with lots of partitions

...