Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In order to mitigate these issues, we propose having two configurations (a) max.broker.partitions to limit the number of partitions per broker, and (b) max.partitions to limit the number of partitions in the cluster overall.Given that the recommendation is

Goals

  • These limits are cluster-wide, not intended to be per-broker. This is in alignment with the current recommendation to run homogenous Kafka clusters

...

  • where all brokers have the same specifications (CPU, RAM, disk etc.)

...

  • .
  • These limits can be changed at runtime, without restarting brokers. This provides greater flexibility as explained in the "Rejected alternatives" section.
  • These limits apply to all topics, even internal topics (which usually are not configured with too many partitions) for the sake of consistency.
  • These limits also apply to topics created via auto topic creation (currently possible via the Metadata and FindCoordinator API requests). If we don't do this, then we have a simple back-door to bypass this check, which we'd like to avoid.
  • These limits do not apply when creating topics or partitions, or reassigning partitions via the ZooKeeper-based admin tools. This is unfortunate, but it simplifies the design. Today, ZooKeeper-based tools don't have a clear way to know about these limits. We will not try to make ZooKeeper-based tools enforce these limits, in an attempt to stay in accordance with the general direction of removing ZooKeeper from Kafka.

Public Interfaces

Configs

Config nameTypeDefaultUpdate-mode
max.broker.partitionsint32int32's max valuecluster-wide
max.partitionsint32int32's max valuecluster-wide

Kafka administrators can specify these in the server.properties file. 

They can also use the following to set/modify these configurations via ZooKeeper.

Code Block
language

Public Interfaces

Configs

...

Kafka administrators can specify these in the server.properties file. 

They can also use the following to set/modify these configurations via ZooKeeper.

Code Block
languagebash
./kafka-config.sh --zookeeper $ZOOKEEPER --alter --add-config max.broker.partitions=4000 --entity-type brokers --entity-default
./kafka-config.sh --zookeeper $ZOOKEEPER --alter --add-config max.partitions=200000 --entity-type brokers --entity-default

It is also possible to set this value per broker via the following command, which applies the change to only a specific broker, for testing purposes.

Code Block
languagebash
./kafka-config.sh --zookeeper $ZOOKEEPER --alter --add-config max.broker.partitions=4000 --entity-type brokers --entity-name 1default
./kafka-config.sh --zookeeper $ZOOKEEPER --alter --add-config max.partitions=200000 --entity-type brokers --entity-name 1

However, if different values are specified for different brokers, then only the value that applies to the controller will matter.

Further, the stricter of the two configurations max.broker.partitions and max.partitions will apply.

An illustrative (toy) example - CreateTopic

For example, in a 3 broker cluster, say that max.broker.partitions is configured equal to 10. If the brokers already host 8, 6 and 9 partitions respectively, then a request to create a new topic with 1 partition and a replication factor 3 can be satisfied, resulting in partition counts of 9, 7 and 10 respectively. However, the next topic creation request for 1 partition with a replication factor of 3 will fail because broker 3 has already reached the limit. A similar request with a replication factor of 2 can however, succeed, because 2 of the brokers have still not reached the limit. If the original request for a topic with 1 partition was actually a request for a topic with 2 partitions, with a replication factor of 3, then the request would have failed in entirety.

API Exceptions

CreateTopics, CreatePartitions and AlterPartitionReassignments APIs, and admin tools kafka-topics.sh and kafka-reassign-partitions.sh will throw the following exception if it is not possible to satisfy the request while respecting the max.broker.partitions or max.partitions limits.

Code Block
languagejava
public class WillExceedPartitionLimitsException extends ApiException { ... }

Corresponding to this exception, we will have the following API error code. The actual exception will contain the values of max.broker.partitions and max.partitions in order to make it easy for users to understand why their request got rejected.

Code Block
languagejava
WILL_EXCEED_PARTITION_LIMITS(88, "Cannot satisfy request without exceeding the partition limits", WillExceedPartitionLimitsException::new); 

Proposed Changes

TODO: How to pass max.broker.partitions and max.partitions values to all the relevant methods documented below.

TODO: Figure out if we want to apply these limits on internal topics.

TODO: Figure out how to impose these limits when running commands directly against ZooKeeper without going through the Kafka API.

The following table shows the list of methods that will need to change in order to support the max.broker.partitions and max.partitions configurations.

...

`AdminZkClient.createTopic`

`AdminZkClient.addPartitions`

`AdminManager.createTopics`

`ReassignPartitionsCommand.generateAssignment`

...

  • Encapsulates the algorithm specified in KIP-36 to assign partitions to brokers on as many racks as possible. This also handles the case when rack-awareness is disabled.
  • This is a pure function without any state or side effects.

...

`AdminZkClient.createTopic`

`AdminManager.createTopics`

`ZookeeperTopicService.createTopic`

...

`KafkaApis.createTopic`

`ZookeeperTopicService.createTopic`

...

`AdminUtils.assignReplicasToBrokers`

`AdminZkClient.createTopicWithAssignment`

...

`AdminManager.createPartitions`

`ZookeeperTopicService.alterTopic`

...

  • Computes replica assignment using `AdminUtils.assignReplicasToBrokers` when replica assignments are not specified.
  • When replica assignments are specified, uses them as is.
  • Creates the ZooKeeper znodes required for the new partitions with the corresponding replica assignments.

...

`AdminUtils.assignReplicasToBrokers`

`AdminZkClient.createTopicWithAssignment`

...

  • Used exclusively by `KafkaApis.handleCreateTopicsRequest` to create topics.
  • Reuses `AdminUtils.assignReplicasToBrokers` when replica assignments are not specified.
  • When replica assignments are specified, uses them as is.

...

`KafkaApis.handleAlterPartitionReassignmentsRequest`

(not quite directly, but the stack trace in the middle is not relevant)

...

`KafkaController.onPartitionReassignment`

(not quite directly, but the stack trace in the middle is not relevant)

...

  • Creates internal topics for storing consumer offsets (__consumer_offsets), and transaction state (__transaction_state).
  • Also used to auto-create topics when topic auto-creation is enabled.

...

`AdminZkClient.createTopic`

`AdminZkClient.createTopicWithAssignment`

...

  • Used by the ./kafka-topics.sh admin tool to create topics when --zookeeper is specified.
  • Reuses `AdminZkClient.createTopic` when no replica assignments are specified.
  • Reuses `AdminZkClient.createTopicWithAssignment` when replica assignments are specified.
default

It is also possible to set this value per broker via the following command, which applies the change to only a specific broker, for testing purposes.

Code Block
languagebash
./kafka-config.sh --zookeeper $ZOOKEEPER --alter --add-config max.broker.partitions=4000 --entity-type brokers --entity-name 1
./kafka-config.sh --zookeeper $ZOOKEEPER --alter --add-config max.partitions=200000 --entity-type brokers --entity-name 1

However, if different values are specified for different brokers, then only the value that applies to the broker handling the request will matter. This is the controller in most cases, but can be any broker in case of auto topic creation.

Further, the stricter of the two configurations max.broker.partitions and max.partitions will apply.

An illustrative (toy) example - CreateTopic

For example, in a 3 broker cluster, say that max.broker.partitions is configured equal to 10. If the brokers already host 8, 6 and 9 partitions respectively, then a request to create a new topic with 1 partition and a replication factor 3 can be satisfied, resulting in partition counts of 9, 7 and 10 respectively. However, the next topic creation request for 1 partition with a replication factor of 3 will fail because broker 3 has already reached the limit. A similar request with a replication factor of 2 can however, succeed, because 2 of the brokers have still not reached the limit. If the original request for a topic with 1 partition was actually a request for a topic with 2 partitions, with a replication factor of 3, then the request would have failed in entirety.

API Exceptions

CreateTopics, CreatePartitions, Metadata, FindCoordinator and AlterPartitionReassignments APIs will throw the following exception if it is not possible to satisfy the request while respecting the max.broker.partitions or max.partitions limits. This applies to Metadata requests only in case auto-topic creation is enabled. This applies to FindCoordinator requests only in case of creating internal topics (__consumer_offsets and __transaction_state).

Code Block
languagejava
public class WillExceedPartitionLimitsException extends ApiException { ... }

Corresponding to this exception, we will have the following API error code. The actual exception will contain the values of max.broker.partitions and max.partitions in order to make it easy for users to understand why their request got rejected.

Code Block
languagejava
WILL_EXCEED_PARTITION_LIMITS(88, "Cannot satisfy request without exceeding the partition limits", WillExceedPartitionLimitsException::new); 

Proposed Changes

The following table shows the list of methods that will need to change in order to support the max.broker.partitions and max.partitions configurations.

Method nameRelevant methods which directly depend on this oneRelevant methods on which this one is directly dependentDescription of what the method does currentlyContext in which used
`AdminUtils.assignReplicasToBrokers`

`AdminZkClient.createTopic`

`AdminZkClient.addPartitions`

`AdminManager.createTopics`

`ReassignPartitionsCommand.generateAssignment`


  • Encapsulates the algorithm specified in KIP-36 to assign partitions to brokers on as many racks as possible. This also handles the case when rack-awareness is disabled.
  • This is a pure function without any state or side effects.
  • API
  • ZooKeeper-based admin tools
`AdminZkClient.createTopicWithAssignment`

`AdminZkClient.createTopic`

`AdminManager.createTopics`

`ZookeeperTopicService.createTopic`


Creates the ZooKeeper znodes required for topic-specific configuration and replica assignments for the partitions of the topic.
  • API
  • ZooKeeper-based admin tools
`AdminZkClient.createTopic`

`KafkaApis.createTopic`

`ZookeeperTopicService.createTopic`

`AdminUtils.assignReplicasToBrokers`

`AdminZkClient.createTopicWithAssignment`

Computes replica assignment using `AdminUtils.assignReplicasToBrokers` and then reuses `AdminZkClient.createTopicWithAssignment`.
  • API
  • ZooKeeper-based admin tools
`AdminZkClient.addPartitions`

`AdminManager.createPartitions`

`ZookeeperTopicService.alterTopic`

`AdminUtils.assignReplicasToBrokers`
  • Computes replica assignment using `AdminUtils.assignReplicasToBrokers` when replica assignments are not specified.
  • When replica assignments are specified, uses them as is.
  • Creates the ZooKeeper znodes required for the new partitions with the corresponding replica assignments.
  • API
  • ZooKeeper-based admin tools
`AdminManager.createTopics``KafkaApis.handleCreateTopicsRequest`

`AdminUtils.assignReplicasToBrokers`

`AdminZkClient.createTopicWithAssignment`

  • Used exclusively by `KafkaApis.handleCreateTopicsRequest` to create topics.
  • Reuses `AdminUtils.assignReplicasToBrokers` when replica assignments are not specified.
  • When replica assignments are specified, uses them as is.
  • API
`AdminManager.createPartitions``KafkaApis.handleCreatePartitionsRequest``AdminZkClient.addPartitions`Used exclusively by `KafkaApis.handleCreatePartitionsRequest` to create partitions on an existing topic.
  • API
`KafkaController.onPartitionReassignment`

`KafkaApis.handleAlterPartitionReassignmentsRequest`

(not quite directly, but the stack trace in the middle is not relevant)


Handles all the modifications required on ZooKeeper znodes and sending API requests required for moving partitions from some brokers to others.
  • API
`KafkaApis.handleCreateTopicsRequest`
`AdminManager.createTopics`Handles the CreateTopics API request sent to a broker, if that broker is the controller.
  • API
`KafkaApis.handleCreatePartitionsRequest`
`AdminManager.createPartitions`Handles the CreatePartitions API request sent to a broker, if that broker is the controller.
  • API
`KafkaApis.handleAlterPartitionReassignmentsRequest`

`KafkaController.onPartitionReassignment`

(not quite directly, but the stack trace in the middle is not relevant)

Handles the AlterPartitionReassignments API request sent to a broker, if that broker is the controller.
  • API
`KafkaApis.createTopic`

`KafkaApis.handleTopicMetadataRequest`

`KafkaApis.handleFindCoordinatorRequest`

(not quite directly, but the stack trace in the middle is not relevant)

`AdminZkClient.createTopic`
  • Creates internal topics for storing consumer offsets (__consumer_offsets), and transaction state (__transaction_state).
  • Also used to auto-create topics when topic auto-creation is enabled.
  • API
`KafkaApis.handleTopicMetadataRequest`

`KafkaApis.createTopic`

(not quite directly, but the stack trace in the middle is not relevant)

Handles the Metadata API request sent to a broker.
  • API
`KafkaApis.handleFindCoordinatorRequest`

`KafkaApis.createTopic`

(not quite directly, but the stack trace in the middle is not relevant)

Handles the FindCoordinator API request sent to a broker.
  • API
`ZookeeperTopicService.createTopic`

`AdminZkClient.createTopic`

`AdminZkClient.createTopicWithAssignment`

  • Used by the ./kafka-topics.sh admin tool to create topics when --zookeeper is specified.
  • Reuses `AdminZkClient.createTopic` when no replica assignments are specified.
  • Reuses `AdminZkClient.createTopicWithAssignment` when replica assignments are specified.
  • ZooKeeper-based admin tools
`ZookeeperTopicService.alterTopic`
`AdminZkClient.addPartitions`
  • Used by the ./kafka-topics.sh admin tool to alter topics when --zookeeper is specified.
  • Calls `AdminZkClient.addPartitions` if topic alteration involves a different number of partitions than what the topic currently has.
  • ZooKeeper-based admin tools
`ReassignPartitionsCommand.generateAssignment`
`AdminUtils.assignReplicasToBrokers`Used by the ./kafka-reassign-partitions.sh admin tool to generate a replica assignment of partitions for the specified topics onto the set of specified brokers.
  • ZooKeeper-based admin tools

For all the methods in the above table that are used in the context of both Kafka API request handling paths and ZooKeeper-based admin tools, we will pass the values for max.broker.partitions and max.partitions as arguments. We will also update all upstream methods to pass these values.

For those methods that are used in the context of the Kafka API, but are not used in the context of ZooKeeper-based admin tools, we will get the values for these limits from the `KafkaConfig` object (which presents the current value based a precedence rule applied on configuration supplied via server.properties and those set via ZooKeeper).

For those methods are used only in the context of ZooKeeper-based admin tools, we will set these limits equal to the maximum int32 value that Java can represent. This is basically because it is not easy (and we don't want to make it easy) to get a reference to the broker-specific `KafkaConfig` object in this context. This design choice is what makes it possible to bypass the max.broker.partitions and max.partitions limits via ZooKeeper-based admin tools

...

  • Used by the ./kafka-topics.sh admin tool to alter topics when --zookeeper is specified.
  • Calls `AdminZkClient.addPartitions` if topic alteration involves a different number of partitions than what the topic currently has.

...

.

Compatibility, Deprecation, and Migration Plan

...

Similarly, a cluster that already has more than max.partitions number of partitions at the time at which max.partitions configuration is set, will continue to function just fine. It will however, fail any further requests to create topics or partitions. Any reassignment of partitions should work fine.

These behaviors are also necessary because (even with this KIP), users can bypass the limit checks by using ZooKeeper-based admin tools.

Rejected Alternatives

Add configuration to limit number of partitions per topic

...