Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, The number of partitions is a lever in controlling the performance of a Kafka cluster does not limit . Increasing the number of partitions can lead to higher availability and performance. However, increasing the number beyond a point can lead to degraded performance on various fronts.

Table 1 shows the impact on producer performance in the context of 3 m5.marge EC2 broker instances, 1 KB-sized records and requested throughput of 240Mbps on a single topic. The table indicates that throughput with 3-way replication improves from 52Mbps to 101Mbps in going from 10 to 100 partitions, but then degrades beyond that. Also, the throughput with 1-way replication is better compared to that with 3-way replication. This is because of the number of replication Fetch requests that a broker receives increases with the number of partitions on the broker, and having 1-way replication means that the broker does not have to deal with Fetch requests. But even so, the performance is much worse with 10000 partitions than with 10 partitions.

Produce throughput
ReplicationNumber of partitions on a broker

101005001000500010000
3-way5210186122.50.9
1-way142113

2416

Table 1: Producer performance test throughput (Mbps) on 3 m5.large brokers, with 1KB-sized records and requested throughput 240 Mbps on a single topic.

Having lots of partitions on a broker can also increase the recovery time in case of broker failure. TODO

We have seen multiple issues in production clusters where having a large number of partitions leads to lower availability, yields live-locked clusters requiring hours of operational effort spent in recovery, and in some cases requires just entirely deleting the clusters and starting all over again implying 100% data loss.

In order to mitigate these issues, we propose having a configuration named max.broker.partitions despite the fact that on a given broker, having more than a certain number of partitions (without even any messages written / read) renders the broker unhealthy. (TODO: Give some examples of what happens) Therefore, we want to add a configuration in Kafka to limit the number of partitions on any per broker. Different brokers in a cluster should be configurable with different values for this configuration, in order to support heterogenous clusters where some brokers are more resourceful than others.

While this KIP focuses only on supporting a way to limit the number of partitions per broker, we aim to use a general approach that is suitable for applying broker-level guard rails on a Kafka cluster. These guard rails can complement a good monitoring solution by acting as defensive checks that prevent brokers from getting into an unhealthy state.

To support dynamic environments where the resources allocated to a broker may change while the broker is running, it should be possible to modify this configuration dynamically i.e. without restarting the brokers.

Glossary

Glossary note: In this KIP, we will refer to the partition capacity of a broker as the number of partitions it can host beyond what it already hosts. More specifically,

...

Config nameTypeDefaultUpdate-mode
max.broker.partitionsint32int32's max valuereadper-onlybroker

API Exceptions

CreateTopics, CreatePartitions and AlterPartitionReassignments APIs will throw the following exception if it is not possible to satisfy the request while respecting the max.broker.partitions limit.

Code Block
languagejava
public class GuardRailsExceptionWillExceedBrokerPartitionCapacityException extends ApiException { ... }

This exception message will indicate that brokers have run out of their partition capacity. In order to be actionable, the exception message will contain a map of brokers with their respective partition capacities.

Corresponding to this exception, we will have the following generic API error code.

Code Block
languagejava
GUARD_RAILSWILL_EXCEED_BROKER_PARTITION_CAPACITY(88, "NotCannot satisfy possiblerequest towithout satisfyexceeding request due to guard railsthe max.broker.partitions limit on brokers", GuardRailsExceptionWillExceedBrokerPartitionCapacityException::new); 

ZooKeeper

Kafka users will be able to use the following to set/modify the max.broker.partitions configuration.

For example,The ZooKeeper znode for each broker will have an additional optional guard_rails.max_partitions field. To support this change in a backwards compatible manner, we will bump up the version of the ZooKeeper broker znode entry from 4 to 5.

Code Block
languagebash
$ zookeeper-shell./kafka-config.sh localhost--zookeeper get /brokers/ids/1
{
  "version":5,
  "host":"localhost",
  "port":9092,
  "jmx_port":9999,
  "timestamp":"2233345666",
  "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
  "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"},
  "rack":"dc1",
  "guard_rails": {
    "max_partitions": 200
  }
}$ZOOKEEPER --alter --add-config max.broker.partitions=1000 --entity-type brokers --entity-name 1

applies the configuration to broker 1 only, and

Code Block
languagebash
./kafka-config.sh --zookeeper $ZOOKEEPER --alter --add-config max.broker.partitions=1000 --entity-type brokers --entity-default

applies the configuration to all brokers.

Proposed Changes

We will introduce a new read-only int32 configuration called max.broker.partitions that Kafka administrators can specify in the server.properties file or modify via ZooKeeper. This will have a default value equal to the maximum int32 value that Java can represent (231 - 1), which should be sufficient for the foreseeable future. In other words, we don't need int64. Since the motivation for setting a limit on the number of partitions is to control the usage of system resources, and because these resources do not typically change while the system is running, it is sufficient to have a read-only configuration, that requires a broker restart in case of change.

An illustrative example

With the max.broker.partitions With this configuration, a topic creation request (via the CreateTopics API) will fail if it is not possible to choose replicas for all the partitions of the topic without running out of partition capacity on at least one broker. This will also hold true for adding partitions to an existing topic (via the CreatePartitions API). Further, a partition reassignment request (via the AlterPartitionReassignments) will fail if the requested reassignment would exceed the max.broker.partitions limit on some broker.

An illustrative example - CreateTopic

For example, in a 3 broker cluster, say that each broker was configured with max.broker.partitions=10. If the brokers already host 8, 6 and 9 partitions respectively, then a request to create a new topic with 1 partition and a replication factor 3 can be satisfied, resulting in partition counts of 9, 7 and 10 respectively. However, the next topic creation request for 1 partition with a replication factor of 3 will fail because broker 3 has already reached the limit. A similar request with a replication factor of 2 can however, succeed, because 2 of the brokers have still not reached the limit. If the original request for a topic with 1 partition was actually a request for a topic with 2 partitions, with a replication factor of 3, then the request would have failed in entirety.The above behavior will also hold true for adding partitions to an existing topic (via the CreatePartitions API). Further, a partition reassignment request (via the AlterPartitionReassignments) will fail if the requested reassignment would exceed the max.broker.partitions limit on some broker.

Algorithm for assigning partition replicas to brokers

...

Communicating max.broker.partitions value from brokers to controller

Since the max.broker.partitions configuration is applied at each broker, we need a way to pass this information to the controller broker which is responsible for assigning partitions to brokers. We propose using ZooKeeper for this. Each broker will store its value of max.broker.partitions (if present) into the corresponding /brokers/ids/{brokerId} ZooKeeper znode as broker metadata.

Instead of adding the max_partitions key at the top-level in the broker znode JSON object, we add it nested within a guard_rails key in order to support other guard rails in the future. Refer to the "Public interfaces > ZooKeeper" section for the new znode format.TODO

Communicating current number of partitions in broker to controller

...

In order to ease migration, a broker that already has more than max.broker.partitions number of partitions at the time at which max.broker.partitions configuration is set for that broker, will continue to function just fine for the existing partitions although it will be unable to host any further partitions. The Kafka administrator can later reassign partitions from this broker to another in order to get the broker to respect the max.broker.partitions limit.

Rejected Alternatives

...

Add configuration to limit number of partitions at the cluster level

Having a limit on each broker effectively imposes a cluster-wide limit as well, thereby making a cluster-level limit not strictly necessary. The cluster-level configuration can make the interface complex, therefore we will avoid it in this KIP. However, we can revisit this in the future if there is a good use case for

...

this.

Add configuration to limit number of partitions at the topic level

Having a limit at the topic level does not help address the problem discussed in this KIP if there are lots of topics. While each topic may only have a limited number of partitions, it is possible that there will be many more partitions than on a broker than it can handle efficiently.

Add configuration to limit the number of topics

Having a limit on the number of topics does not help address the problem discussed in this KIP if there are lots of partitions on the topic.

Make the max.broker.partitions configuration read-only

This approach makes administration a bit easier because once the limit is set on the broker, the users of the cluster cannot accidentally change the value without administrator privileges and without restarting the broker. This can provide an additional safety net.

However, this approach is not flexible. As mentioned in the Motivation section, the number of partitions that a broker can handle depends on the replication factor of these partitions. Smaller the replication factor, lower is the incoming traffic due to replication Fetch requests that the broker has to handle. This allows the broker to use its resources more efficiently to handle the client requests such as produce / consume. Therefore, a Kafka administrator may want to set different values on a broker as the workload changes without disrupting operations by restarting the brokers.

Further, in environments such as Docker where it is possible to allocate more resources to the broker on the fly, it would be restrictive to not be able to modify the max.broker.partitions configuration on the fly as well

...

.