Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The number of partitions is a lever in controlling the performance of a Kafka cluster. Increasing the number of partitions can lead to higher availability and performance. However, increasing the number beyond a point can lead to degraded performance on various fronts. 

The current generic recommendation to have no more than 4000 partitions per broker and no more than 200000 partitions per cluster is not enforced by Kafka. Therefore, it is possible that Kafka users accidentally overload their cluster to the point of no return. We have seen multiple issues in production clusters where having a large number of partitions leads to live-locked clusters that are so busy that even topic deletion requests intended to alleviate the problem do not complete.

We did some performance experiments to understand the effect of increasing the number of partitions. See Appendix A1 for producer performance, and A2 for topic creation and deletion times, and A3 for recovery time under broker failure. These consistently indicate that having lots of partitions provides degraded performance for no additional gaina large number of partitions can lead to a malfunctioning cluster.

In order to mitigate these issues, we propose having two configurations (a)a cluster from entering a bad state due to a large number of topic partitions, we propose having two configurations (a) max.broker.partitions to limit the number of partitions per broker, and (b) max.partitions to limit the number of partitions in the cluster overall. These can act as guard rails ensuring that the cluster is never operating with a higher number of partitions than it can handle.

...

  • These limits are cluster-wide. This is obviously true for max.partitions which is meant to apply at the cluster level. However, we choose this for max.broker.partitions too, instead of supporting different values for each broker, . This is in keeping alignment with the current recommendation to run homogenous Kafka clusters where all brokers have the same specifications (CPU, RAM, disk etc.).
  • These limits can be changed at runtime, without restarting brokers. This provides greater flexibility as explained in . See the "Rejected alternatives" section for why we did not go with read-only configuration.
  • These limits apply to all topics, even internal topics (which usually are not configured i.e. __consumer_offsets and __transaction_state, which usually are not configured with too many partitions) for the sake of consistency. This provides the same consistent experience across all topics and partitions.
  • These limits also apply to topics created via auto topic creation (currently possible via the Metadata and FindCoordinator API requests). By enforcing this, we disallow having a backdoor to bypass these limits.
  • These limits do not apply when creating topics or partitions, or reassigning partitions via the ZooKeeper-based admin tools. This is unfortunate, because it does create a backdoor to bypass these limits. We choose this approach because it simplifies the design, and believe this is acceptable given that we will eventually deprecate ZooKeeper in However, we leave this out of scope here given that ZooKeeper will eventually be deprecated from Kafka.

Public Interfaces

Configs

...

Having a limit at the topic level does not help address the problem discussed in this KIP if there are lots is large number of topics. While each topic may only have a limited number of partitions, it is possible that there will be many more partitions than on a broker than it can handle efficiently.

...

Having a limit on the number of topics does not help address the problem discussed in this KIP if there are lots of is a large number of partitions on the topic.

Make these configurations read-only

...

This is in general a more flexible approach than the one described in this KIP and allows having different brokers with different resources, each have its own max.broker.partitions configuration. However, this would require sending the broker-specific configuration to the controller, which needs this while creating topics and partitions or reassigning partitions. One approach would be to put this information into the broker's ZooKeeper znode and have the controller rely on that. And the other would be to create a new API request-response that brokers can use to share this information with the controller. Both of these approaches introduce complexity for little gain. We are not aware of any clusters that are running with heterogenous configurations where having different max.broker.partitions configuration for each broker would help. Therefore, in this KIP, we do not take this approach.

...

Appendix

...

:

...

 Performance with a large number of partitions

Our setup had We did a performance test (using kafka-producer-perf-test.sh from a single m5.4xlarge EC2 instance). We had 3 m5.large EC2 broker instances on 3 different AZs within the same AWS region us-east-1, running Kafka version 2.3.1. Each broker had an EBS GP2 volume attached to it for data storage. All communication was plaintext and records were not compressed. The brokers each have had 8 IO threads (num.io.threads), 2 replica fetcher threads (num.replica.fetchers) and 5 network threads (num.network.threads).

A1: Producer performance

The following table shows results from a producer performance test.  On We did a performance test (using kafka-producer-perf-test.sh from a single m5.4xlarge EC2 instance). On the producer side, each record was 1 KB in size. The batch size (batch.size) and artificial delay (linger.ms) were left at their default values.

The following table shows results from a producer performance test. The table indicates that throughput with 3-way replication improves from 52Mbps to 101Mbps in going from 10 to 100 partitions, but then degrades beyond that. Also, the throughput with 1-way replication is better compared to that with 3-way replication. This is because of the number of replication Fetch requests that a broker receives increases with the number of partitions on the broker, and having 1-way replication means that the broker does not have to deal with Fetch requests. But even so, the performance is much worse with 10000 partitions than with 10 partitions.

...

We also stress tested the system with a single topic consisting of 30000 partitions each with 3-way replication. The producer performance test provided basically 0 throughput. Most requests returned NOT_ENOUGH_REPLICAS error, indicating that replication is backlogged, and brokers are also unable to handle the Produce requests. The UnderMinIsrPartitionCount was equal to the LeaderCount on all brokers, confirming this hypothesis.

A2: Topic creation and deletion

...

times

For stress testing the system with a large number of partitions, we created a single topic with 30000 partitions with 3-way replication. It took 30 minutes for the necessary (30000) log directories to get created on the brokers after the point in time when the PartitionCount metric indicated that the brokers have 30000 partitions each.

In this case, deleting We then tried to delete the topic with no Produce / Fetch traffic took 1 hour 40 minutes. As the above deletion happened, we noticed that once there were fewer partitions left to delete, the deletion process was happening more rapidly. The following table shows how long the last N partitions (out of the total 30000 partitions) took to delete.

We can see that deletion times are exponential in the number of partitions.

ongoing. However, topic deletion did not complete. From the GC logs, we noticed that all brokers had almost maxed out their allocated heap (Xmx) of 2GB, and the processes were busy in G1 garbage collection, causing the ZooKeeper sessions to repeatedly timeout (with session timeout of 6 seconds). Restarting the brokers reduced heap usage to 800MB, but did not cause the partitions to get deleted.

Although deletion did not happen, the LeaderCount did drop to 0 in 1 hour 40 minutes. We noticed that leadership resignation process was happening more rapidly as the number of partitions with leadership decreased. The following table shows how long it took for the last N partitions (out of the total 30000 partitions) took to not have any leader.

We can see that leadership resignation times are exponential in the number of partitions.

Leadership resignation Deletion time (minutes)
Number of partitions to be deletedleft
3000020000100005000100010010
1004393< 1< 1< 1

The above data actually corresponds to the approximate deletion times as measured by the decrease in the LeaderCount metric. The PartitionCount metric did not come down at all from 30000. From the GC logs, we noticed that this was because all brokers had maxed out their allocated heap of 2GB, and the process was spending a lot of time in garbage collection, which caused the ZooKeeper sessions to repeatedly timeout.

A2: Recovery time under failure

...