You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 19 Next »

Author: Gokul Ramanan Subramanian

Status

Current stateDraft

Discussion thread: here

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The number of partitions is a lever in controlling the performance of a Kafka cluster. Increasing the number of partitions can lead to higher availability and performance. However, increasing the number beyond a point can lead to degraded performance on various fronts. 

The current generic recommendation to have no more than 4000 partitions per broker and no more than 200000 partitions per cluster is not enforced by Kafka. Therefore, it is possible that Kafka users accidentally overload their cluster to the point of no return.

We have seen multiple issues in production clusters where having a large number of partitions leads to live-locked clusters that are so busy that even topic deletion requests intended to alleviate the problem do not complete.

We did some performance experiments to understand the effect of increasing the number of partitions. See Appendix A1 that describes the impact on producer performance. See Appendix A2 that describes impact on recovery time under controller failure.

In order to mitigate these issues, we propose having two configurations (a) max.broker.partitions to limit the number of partitions per broker, and (b) max.partitions to limit the number of partitions in the cluster overall.

Given that the recommendation is to run homogenous Kafka clusters, where each broker in the cluster has the same specifications (CPU, RAM, disk etc.), we will not attempt to support max.broker.partitions as a broker-specific configuration, but rather only as a cluster-wide configuration.

Public Interfaces

Configs

Config nameTypeDefaultUpdate-mode
max.broker.partitionsint32int32's max valuecluster-wide
max.partitionsint32int32's max valuecluster-wide

Kafka administrators can specify these in the server.properties file. 

They can also use the following to set/modify these configurations via ZooKeeper.

./kafka-config.sh --zookeeper $ZOOKEEPER --alter --add-config max.broker.partitions=4000 --entity-type brokers --entity-default
./kafka-config.sh --zookeeper $ZOOKEEPER --alter --add-config max.partitions=200000 --entity-type brokers --entity-default

It is also possible to set this value per broker via the following command, which applies the change to only a specific broker, for testing purposes.

./kafka-config.sh --zookeeper $ZOOKEEPER --alter --add-config max.broker.partitions=4000 --entity-type brokers --entity-name 1
./kafka-config.sh --zookeeper $ZOOKEEPER --alter --add-config max.partitions=200000 --entity-type brokers --entity-name 1

However, if different values are specified for different brokers, then only the value that applies to the controller will matter.

Further, the stricter of the two configurations max.broker.partitions and max.partitions will apply.

An illustrative (toy) example - CreateTopic

For example, in a 3 broker cluster, say that max.broker.partitions is configured equal to 10. If the brokers already host 8, 6 and 9 partitions respectively, then a request to create a new topic with 1 partition and a replication factor 3 can be satisfied, resulting in partition counts of 9, 7 and 10 respectively. However, the next topic creation request for 1 partition with a replication factor of 3 will fail because broker 3 has already reached the limit. A similar request with a replication factor of 2 can however, succeed, because 2 of the brokers have still not reached the limit. If the original request for a topic with 1 partition was actually a request for a topic with 2 partitions, with a replication factor of 3, then the request would have failed in entirety.

API Exceptions

CreateTopics, CreatePartitions and AlterPartitionReassignments APIs, and admin tools kafka-topics.sh and kafka-reassign-partitions.sh will throw the following exception if it is not possible to satisfy the request while respecting the max.broker.partitions or max.partitions limits.

public class WillExceedPartitionLimitsException extends ApiException { ... }

Corresponding to this exception, we will have the following API error code. The actual exception will contain the values of max.broker.partitions and max.partitions in order to make it easy for users to understand why their request got rejected.

WILL_EXCEED_PARTITION_LIMITS(88, "Cannot satisfy request without exceeding the partition limits", WillExceedPartitionLimitsException::new); 

Proposed Changes

TODO: How to pass max.broker.partitions and max.partitions values to all the relevant methods documented below.

TODO: Figure out if we want to apply these limits on internal topics.

TODO: Figure out how to impose these limits when running commands directly against ZooKeeper without going through the Kafka API.

The following table shows the list of methods that will need to change in order to support the max.broker.partitions and max.partitions configurations.

Method nameRelevant methods which directly depend on this oneRelevant methods on which this one is directly dependentDescription of what the method does currentlyChange required
`AdminUtils.assignReplicasToBrokers`

`AdminZkClient.createTopic`

`AdminZkClient.addPartitions`

`AdminManager.createTopics`

`ReassignPartitionsCommand.generateAssignment`


  • Encapsulates the algorithm specified in KIP-36 to assign partitions to brokers on as many racks as possible. This also handles the case when rack-awareness is disabled.
  • This is a pure function without any state or side effects.

`AdminZkClient.createTopicWithAssignment`

`AdminZkClient.createTopic`

`AdminManager.createTopics`

`ZookeeperTopicService.createTopic`


Creates the ZooKeeper znodes required for topic-specific configuration and replica assignments for the partitions of the topic.
`AdminZkClient.createTopic`

`KafkaApis.createTopic`

`ZookeeperTopicService.createTopic`

`AdminUtils.assignReplicasToBrokers`

`AdminZkClient.createTopicWithAssignment`

Computes replica assignment using `AdminUtils.assignReplicasToBrokers` and then reuses `AdminZkClient.createTopicWithAssignment`.
`AdminZkClient.addPartitions`

`AdminManager.createPartitions`

`ZookeeperTopicService.alterTopic`

`AdminUtils.assignReplicasToBrokers`
  • Computes replica assignment using `AdminUtils.assignReplicasToBrokers` when replica assignments are not specified.
  • When replica assignments are specified, uses them as is.
  • Creates the ZooKeeper znodes required for the new partitions with the corresponding replica assignments.

`AdminManager.createTopics``KafkaApis.handleCreateTopicsRequest`

`AdminUtils.assignReplicasToBrokers`

`AdminZkClient.createTopicWithAssignment`

  • Used exclusively by `KafkaApis.handleCreateTopicsRequest` to create topics.
  • Reuses `AdminUtils.assignReplicasToBrokers` when replica assignments are not specified.
  • When replica assignments are specified, uses them as is.

`AdminManager.createPartitions``KafkaApis.handleCreatePartitionsRequest``AdminZkClient.addPartitions`Used exclusively by `KafkaApis.handleCreatePartitionsRequest` to create partitions on an existing topic.
`KafkaController.onPartitionReassignment`

`KafkaApis.handleAlterPartitionReassignmentsRequest`

(not quite directly, but the stack trace in the middle is not relevant)


Handles all the modifications required on ZooKeeper znodes and sending API requests required for moving partitions from some brokers to others.
`KafkaApis.handleCreateTopicsRequest`
`AdminManager.createTopics`Handles the CreateTopics API request sent to a broker, if that broker is the controller.
`KafkaApis.handleCreatePartitionsRequest`
`AdminManager.createPartitions`Handles the CreatePartitions API request sent to a broker, if that broker is the controller.
`KafkaApis.handleAlterPartitionReassignmentsRequest`

`KafkaController.onPartitionReassignment`

(not quite directly, but the stack trace in the middle is not relevant)

Handles the AlterPartitionReassignments API request sent to a broker, if that broker is the controller.
`KafkaApis.createTopic`
`AdminZkClient.createTopic`
  • Creates internal topics for storing consumer offsets (__consumer_offsets), and transaction state (__transaction_state).
  • Also used to auto-create topics when topic auto-creation is enabled.

`ZookeeperTopicService.createTopic`

`AdminZkClient.createTopic`

`AdminZkClient.createTopicWithAssignment`

  • Used by the ./kafka-topics.sh admin tool to create topics when --zookeeper is specified.
  • Reuses `AdminZkClient.createTopic` when no replica assignments are specified.
  • Reuses `AdminZkClient.createTopicWithAssignment` when replica assignments are specified.

`ZookeeperTopicService.alterTopic`
`AdminZkClient.addPartitions`
  • Used by the ./kafka-topics.sh admin tool to alter topics when --zookeeper is specified.
  • Calls `AdminZkClient.addPartitions` if topic alteration involves a different number of partitions than what the topic currently has.

`ReassignPartitionsCommand.generateAssignment`
`AdminUtils.assignReplicasToBrokers`Used by the ./kafka-reassign-partitions.sh admin tool to generate a replica assignment of partitions for the specified topics onto the set of specified brokers.

Compatibility, Deprecation, and Migration Plan

This change is backwards-compatible in practice because we will set the default values for max.broker.partitions and max.partitions equal to the maximum int32 value that Java can represent, which is quite large (231 - 1). Users will anyway run into system issues far before hitting these limits.

In order to ease migration, a broker that already has more than max.broker.partitions number of partitions at the time at which max.broker.partitions configuration is set for that broker, will continue to function just fine for the existing partitions although it will be unable to host any further partitions. The Kafka administrator can later reassign partitions from this broker to another in order to get the broker to respect the max.broker.partitions limit.

Similarly, a cluster that already has more than max.partitions number of partitions at the time at which max.partitions configuration is set, will continue to function just fine. It will however, fail any further requests to create topics or partitions. Any reassignment of partitions should work fine.

Rejected Alternatives

Add configuration to limit number of partitions per topic

Having a limit at the topic level does not help address the problem discussed in this KIP if there are lots of topics. While each topic may only have a limited number of partitions, it is possible that there will be many more partitions than on a broker than it can handle efficiently.

Add configuration to limit the number of topics

Having a limit on the number of topics does not help address the problem discussed in this KIP if there are lots of partitions on the topic.

Make these configurations read-only

This approach makes administration a bit easier because once the limits are set, the users of the cluster cannot accidentally change the value without administrator privileges and without restarting the brokers. This can provide an additional safety net.

However, in environments such as Docker where it is possible to allocate more resources to the broker on the fly, it would be restrictive to not be able to modify the max.broker.partitions and max.partitions configurations on the fly as well.

Further having these configurations be read-only is not flexible. The number of partitions that a broker can handle depends on the replication factor of these partitions. Smaller the replication factor, lower is the incoming traffic due to replication Fetch requests that the broker has to handle. This allows the broker to use its resources more efficiently to handle the client requests such as produce / consume. Therefore, a Kafka administrator may want to set different values on a broker as the workload changes without disrupting operations by restarting the brokers.

Support max.broker.partitions as a per-broker configuration

This is in general a more flexible approach than the one described in this KIP and allows having different brokers with different resources, each have its own max.broker.partitions configuration. However, this would require sending the broker-specific configuration to the controller, which needs this while creating topics and partitions or reassigning partitions. One approach would be to put this information into the broker's ZooKeeper znode and have the controller rely on that. And the other would be to create a new API request-response that brokers can use to share this information with the controller. Both of these approaches introduce complexity for little gain. We are not aware of any clusters that are running with heterogenous configurations where having different max.broker.partitions configuration for each broker would help. Therefore, in this KIP, we do not take this approach.

Appendix

Appendix A: Performance with lots of partitions

We did a performance test (using kafka-producer-perf-test.sh from a single m5.4xlarge EC2 instance). We had 3 m5.large EC2 broker instances on 3 different AZs within the same AWS region us-east-1, running Kafka version 2.3.1. Each broker had an EBS GP2 volume attached to it for data storage. All communication was plaintext and records were not compressed. The brokers each have 8 IO threads (num.io.threads), 2 replica fetcher threads (num.replica.fetchers) and 5 network threads (num.network.threads).

A1: Producer performance

The following table shows results from a producer performance test.  On the producer side, each record was 1 KB in size. The batch size (batch.size) and artificial delay (linger.ms) were left at their default values.

The table indicates that throughput with 3-way replication improves from 52Mbps to 101Mbps in going from 10 to 100 partitions, but then degrades beyond that. Also, the throughput with 1-way replication is better compared to that with 3-way replication. This is because of the number of replication Fetch requests that a broker receives increases with the number of partitions on the broker, and having 1-way replication means that the broker does not have to deal with Fetch requests. But even so, the performance is much worse with 10000 partitions than with 10 partitions.

Produce throughput (Mbps)
ReplicationNumber of partitions on a broker

101005001000500010000
3-way5210186122.50.9
1-way142113104992416

A2: Recovery time under failure

Having lots of partitions on a broker can also increase the recovery time in case of controller failure. TODO

  • No labels