Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Author: Gokul Ramanan Subramanian

Status

Current stateDraft

Discussion thread: here

JIRA:

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9590

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The number of partitions is a lever in controlling the performance of a Kafka cluster. Increasing the number of partitions can lead to higher availability and performance. However, increasing the number beyond a point can lead to degraded performance on various fronts. See Appendix A1 that describes the impact on producer performance. See Appendix A2 that describes impact on recovery time under broker failure. We have seen multiple issues in production clusters where having a large number of partitions leads to live-locked clusters that are so busy that topic deletion requests intended to alleviate the problem themselves do not complete.

The current recommendation to have no more than 4000 partitions per broker is not enforced by Kafka. Therefore, it is possible that Kafka users accidentally overload their cluster to the point of no return.

In order to mitigate these issues, we propose having a configuration named max.broker.partitions to limit the number of partitions per broker.

Given that the recommendation is to run homogenous Kafka clusters, where each broker in the cluster has the same specifications (CPU, RAM, disk etc.), we will not attempt to support max.broker.partitions as a broker-specific configuration, but rather only as a cluster-wide configuration.

In order to support dynamic environments (such as Docker) where the resources allocated to the brokers may change while the brokers are running, it should be possible to modify this configuration dynamically i.e. without restarting the brokers.

Glossary

In this KIP, we will refer to the partition capacity of a broker as the number of partitions it can host beyond what it already hosts. More specifically,

Code Block
languagejava
titleDefinition: Partition capacity
partition capacity of broker = max(0, max.broker.partitions value - current number of partitions hosted by broker)

Public Interfaces

Configs

Config nameTypeDefaultUpdate-mode
max.broker.partitionsint32int32's max valuecluster-wide

Kafka administrators can specify the max.broker.partitions configuration in the server.properties file. 

They can also use the following to set/modify the max.broker.partitions configuration via ZooKeeper.

Code Block
languagebash
./kafka-config.sh --zookeeper $ZOOKEEPER --alter --add-config max.broker.partitions=1000 --entity-type brokers --entity-default

It is also possible to set this value per broker via the following command for testing purposes.

Code Block
languagebash
./kafka-config.sh --zookeeper $ZOOKEEPER --alter --add-config max.broker.partitions=1000 --entity-type brokers --entity-name 1

However, if different values are specified for different brokers, then only the value that applies to the controller will matter.

API Exceptions

CreateTopics, CreatePartitions and AlterPartitionReassignments APIs will throw the following exception if it is not possible to satisfy the request while respecting the max.broker.partitions limit.

Code Block
languagejava
public class WillExceedBrokerPartitionCapacityException extends ApiException { ... }

In order to be actionable, the exception message will contain a map of brokers with their respective partition capacities.

Corresponding to this exception, we will have the following API error code.

Code Block
languagejava
WILL_EXCEED_BROKER_PARTITION_CAPACITY(88, "Cannot satisfy request without exceeding the max.broker.partitions limit", WillExceedBrokerPartitionCapacityException::new); 

Proposed Changes

We will introduce a new cluster-wide int32 configuration called max.broker.partitions that Kafka administrators can specify in the server.properties file or modify via ZooKeeper. This will have a default value equal to the maximum int32 value that Java can represent (231 - 1), which should be sufficient for the foreseeable future. In other words, we don't need int64.

With the max.broker.partitions configuration, a topic creation request (via the CreateTopics API) will fail if it is not possible to choose replicas for all the partitions of the topic without running out of partition capacity on at least one broker. This will also hold true for adding partitions to an existing topic (via the CreatePartitions API). Further, a partition reassignment request (via the AlterPartitionReassignments) will fail if the requested reassignment would exceed the max.broker.partitions limit on some broker.

An illustrative example - CreateTopic

For example, in a 3 broker cluster, say that max.broker.partitions is configured equal to 10. If the brokers already host 8, 6 and 9 partitions respectively, then a request to create a new topic with 1 partition and a replication factor 3 can be satisfied, resulting in partition counts of 9, 7 and 10 respectively. However, the next topic creation request for 1 partition with a replication factor of 3 will fail because broker 3 has already reached the limit. A similar request with a replication factor of 2 can however, succeed, because 2 of the brokers have still not reached the limit. If the original request for a topic with 1 partition was actually a request for a topic with 2 partitions, with a replication factor of 3, then the request would have failed in entirety.

Algorithm for assigning partition replicas to brokers

When creating a topic or adding partitions to a given topic, the controller currently assigns partitions to brokers such that a partition's replicas are on as many different racks as possible (see KIP-36). This algorithm is encapsulated in the method `AdminUtils.assignReplicasToBrokers`. We will modify this algorithm to ensure that brokers with no partition capacity do not get assigned any partitions.

The `AdminManager` class has access to the `KafkaConfig` object from where the controller can extract the max.broker.partitions value if present.

Each broker (including the controller broker) keeps a cache of partition state (in `MetadataCache.UpdateMetadataPartitionState`). The `AdminManager` class has access to the `MetadataCache` and will use this to construct a reverse map of `active broker ID → number of partitions`.

Similarly, the `KafkaController` class (which is relevant only on the controller broker) keeps a reference to `ControllerContext`, whose `replicasOnBrokers` method we will use to obtain the number of partitions in each broker.

Compatibility, Deprecation, and Migration Plan

This change is backwards-compatible in practice because we will set the default value for max.broker.partitions equal to the maximum int32 value that Java can represent, which is quite large (231 - 1). Users will anyway run into system issues far before the number of partitions on a broker reaches this value.

In order to ease migration, a broker that already has more than max.broker.partitions number of partitions at the time at which max.broker.partitions configuration is set for that broker, will continue to function just fine for the existing partitions although it will be unable to host any further partitions. The Kafka administrator can later reassign partitions from this broker to another in order to get the broker to respect the max.broker.partitions limit.

Rejected Alternatives

Add configuration to limit total number of partitions in the cluster

Having a limit on each broker effectively imposes a cluster-wide limit as well, thereby making a cluster-level limit not strictly necessary. The cluster-level configuration can make the interface complex, therefore we will avoid it in this KIP. However, we can revisit this in the future if there is a good use case for this.

Add configuration to limit number of partitions per topic

Having a limit at the topic level does not help address the problem discussed in this KIP if there are lots of topics. While each topic may only have a limited number of partitions, it is possible that there will be many more partitions than on a broker than it can handle efficiently.

Add configuration to limit the number of topics

Having a limit on the number of topics does not help address the problem discussed in this KIP if there are lots of partitions on the topic.

Make the max.broker.partitions configuration read-only

This approach makes administration a bit easier because once the limit is set on the broker, the users of the cluster cannot accidentally change the value without administrator privileges and without restarting the broker. This can provide an additional safety net.

However, this approach is not flexible. As mentioned in the Motivation section, the number of partitions that a broker can handle depends on the replication factor of these partitions. Smaller the replication factor, lower is the incoming traffic due to replication Fetch requests that the broker has to handle. This allows the broker to use its resources more efficiently to handle the client requests such as produce / consume. Therefore, a Kafka administrator may want to set different values on a broker as the workload changes without disrupting operations by restarting the brokers.

Further, in environments such as Docker where it is possible to allocate more resources to the broker on the fly, it would be restrictive to not be able to modify the max.broker.partitions configuration on the fly as well.

Make the max.broker.partitions per-broker

This is in general a more flexible approach than the one described in this KIP and allows having different brokers with different resources, each have its own max.broker.partitions configuration. However, this would require sending the broker-specific configuration to the controller, which can use this while creating topics and partitions or reassigning partitions. One approach would be to put this information into the broker's ZooKeeper znode and have the controller rely on that. And the other would be to create a new API request-response that brokers can use to share this information with the controller. Both of these approaches introduce complexity for little gain. We are not aware of any clusters that are running with heterogenous configurations where having different max.broker.partitions configuration for each broker would help. Therefore, in this KIP, we do not take this approach.

Appendix

A1: Producer performance with lots of partitions

The following table shows the impact on producer performance in the context of 3 m5.large EC2 broker instances, with 1 KB-sized records and requested throughput of 240Mbps on a single topic. The table indicates that throughput with 3-way replication improves from 52Mbps to 101Mbps in going from 10 to 100 partitions, but then degrades beyond that. Also, the throughput with 1-way replication is better compared to that with 3-way replication. This is because of the number of replication Fetch requests that a broker receives increases with the number of partitions on the broker, and having 1-way replication means that the broker does not have to deal with Fetch requests. But even so, the performance is much worse with 10000 partitions than with 10 partitions.

TODO: Redo these experiments without specifying a requested throughput.

Produce throughput
ReplicationNumber of partitions on a broker

101005001000500010000
3-way5210186122.50.9
1-way142113TODOTODO2416

A2: Recovery time with lots of partitions

Having lots of partitions on a broker can also increase the recovery time in case of broker failure. TODO