Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For all the methods in the above table that are used in the context of both Kafka API request handling paths and ZooKeeper-based admin tools (`AdminUtils.assignReplicasToBrokers`, `AdminZkClient.createTopicWithAssignment`,  `AdminZkClient.createTopic` and `AdminZkClient.addPartitions`), we will pass the values for max.broker.partitions and max.partitions as arguments. We will also update all upstream methods to pass these values.maximum number of partitions per broker, maximum number of partitions overall, and the current number of partitions for each broker as arguments.

We will modify the core algorithm for replica assignment in the `AdminUtils.assignReplicasToBrokers` method. The modified algorithm will ensure that as replicas are being assigned to brokers iteratively one partition at a time, if assigning the next partition to a broker causes the broker to exceed the max.broker.partitions limit, then the broker is skipped. If all brokers are skipped successively in a row, then the algorithm will terminate and throw WillExceedPartitionLimitsException. The check for max.partitions is much simpler and based purely on the total number of partitions that exist across all brokers.

When the methods are invoked in the context of a Kafka API call, we will get the values for the maximum number of partitions per broker by reading the max.broker.partitions configuration from the `KafkaConfig` object (which holds the current value after applying precedence rules For those methods that are used in the context of the Kafka API, but are not used in the context of ZooKeeper-based admin tools, we will get the values for these limits from the `KafkaConfig` object (which presents the current value based a precedence rule applied on configuration supplied via server.properties and those set via ZooKeeper). Similarly, we will get the maximum number of partitions overall by reading the max.partitions configuration from the `KafkaConfig` object. We will fetch the current number of partitions for each broker from either the `AdminManager` or `KafkaControllerContext` depending on the method.

When the methods are invoked For those methods are used only in the context of ZooKeeper-based admin tools, we will set these limits equal to the maximum int32 value that Java can represent. This is basically because it is not easy (and we don't want to make it easy) to get a reference to the broker-specific `KafkaConfig` object in this context. This design choice is what makes it possible to bypass the max.broker.partitions and max.partitions limits via ZooKeeper-based admin toolsWe will also set the object representing the current number of partitions for each broker to None, since it is not relevant when the limits are not specified.

Compatibility, Deprecation, and Migration Plan

...