Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

PRhttps://github.com/apache/kafka/pull/8499 (currently only prototype, slightly out of date wrt KIP, but gets the idea across)

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Config nameTypeDefaultUpdate-mode
max.broker.partitionsint32int64int32int64's max value (263 - 1)cluster-wide
max.partitionsint32int64int32int64's max value (263 - 1)cluster-wide

Kafka administrators can specify these in the server.properties file. 

They can also use the following to set/modify these configurations via ZooKeeperthe kafka-config.sh admin tool.

Code Block
languagebash
./kafka-config.sh --zookeeperbootstrap-server $ZOOKEEPER --alter --add-config max.broker.partitions=4000 --entity-type brokers --entity-default
./kafka-config.sh --zookeeper $ZOOKEEPER --alter --add-config max.partitions=200000 --entity-type brokers --entity-default

...

Code Block
languagebash
./kafka-config.sh --zookeeperbootstrap-server $ZOOKEEPER$SERVERS --alter --add-config max.broker.partitions=4000 --entity-type brokers --entity-name 1
./kafka-config.sh --zookeeperbootstrap-server $ZOOKEEPER$SERVERS --alter --add-config max.partitions=200000 --entity-type brokers --entity-name 1

...

CreateTopics, CreatePartitions, AlterPartitionReassignments and Metadata APIs will throw the following new exception if type ResourceLimitReachedException if it is not possible to satisfy the request while respecting the max.broker.partitions or max.partitions limits. This applies to Metadata requests only in case auto-topic creation is enabled.

Code Block
languagejava
public class WillExceedPartitionLimitsExceptionResourceLimitReachedException extends ApiException { ... }

Corresponding to this exception, we will have the following API error code. The actual exception will contain the values of max.broker.partitions and max.partitions in order to make it easy for users to understand why their request got rejected.

Code Block
languagejava
WILLRESOURCE_EXCEEDLIMIT_PARTITION_LIMITSREACHED(8889, "Cannot satisfy request without exceeding the partition limits", WillExceedPartitionLimitsException::new);  exceeding the partition limits", WillExceedPartitionLimitsException::new); 

These exception and error types will be reusable for other types of resource limits as well. The exception message should clearly indicate the type of resource limit that was hit, and why.

Create topic policy plugins can also (but are not required to) throw these exceptions in case they want to enforce limits on their own.

Proposed Changes

The following table shows the list of methods that will need to change in order to support the max.broker.partitions and max.partitions configurations. (We skip a few internal methods for the sake of simplicity.)

...

For all the methods in the above table that are used in the context of both Kafka API request handling paths and ZooKeeper-based admin tools (`AdminUtils.assignReplicasToBrokers`, `AdminZkClient.createTopicWithAssignment`,  `AdminZkClient.createTopic` and `AdminZkClient.addPartitions`), we will pass the values for maximum number of partitions per broker, maximum number of partitions overall, and the current number of partitions for each broker as arguments.

We will modify the core algorithm for replica assignment in the `AdminUtils.assignReplicasToBrokers` method. The modified algorithm will ensure that as replicas are being assigned to brokers iteratively one partition at a time, if assigning the next partition to a broker causes the broker to exceed the max.broker.partitions limit, then the broker is skipped. If all brokers are skipped successively in a row, then the algorithm will terminate and throw WillExceedPartitionLimitsExceptionthrow ResourceLimitReachedException. The check for max.partitions is much simpler and based purely on the total number of partitions that exist across all brokers.

When the methods are invoked in the context of a Kafka API call, we will get the values for the maximum number of partitions per broker by reading the max.broker.partitions configuration from the `KafkaConfig` object (which holds the current value after applying precedence rules on configuration supplied via server.properties and those set via ZooKeeper). Similarly, we will get the maximum number of partitions overall by reading the max.partitions configuration from the `KafkaConfig` object. We will fetch the current number of partitions for each broker from either the `AdminManager` or `KafkaControllerContext` depending on the method.

When the methods are invoked in the context of ZooKeeper-based admin tools, we will set these limits equal to the maximum int32 value int64 value that Java can represent. This is basically because it is not easy (and we don't want to make it easy) to get a reference to the broker-specific `KafkaConfig` object in this context. We will also set the object representing the current number of partitions for each broker to None, since it is not relevant when the limits are not specified.

...

This change is backwards-compatible in practice because we will set the default values for max.broker.partitions and max.partitions equal to the maximum int32 value int64 value that Java can represent, which is quite large (231 63 - 1). Users will anyway run into system issues far before hitting these limits.

...