...
CreateTopics, CreatePartitions and AlterPartitionReassignments APIs, and admin tools kafka-topics.sh and kafka-reassign-partitions.sh will throw the following exception if it is not possible to satisfy the request while respecting the max.broker.partitions or max.partitions limits.
...
Code Block | ||
---|---|---|
| ||
WILL_EXCEED_PARTITION_LIMITS(88, "Cannot satisfy request without exceeding the partition limits", WillExceedPartitionLimitsException::new); |
Proposed Changes
TODO: Add some more detail.
When creating a topic or adding partitions to a given topic, the controller currently assigns partitions to brokers such that a partition's replicas are on as many different racks as possible (see KIP-36). This algorithm is encapsulated in the method `AdminUtils.assignReplicasToBrokers`. We will modify this algorithm to ensure that the request fails if the number of resulting partitions in the cluster would exceed the max.partitions limit.
Further, we will modify this algorithm to ensure that brokers don't get assigned more than max.broker.partitions number of partitions.
The `AdminManager` class has access to the `KafkaConfig` object from where the controller can extract the max.broker.partitions and max.partitions configurations.
Each broker (including the controller broker) keeps a cache of partition state (in `MetadataCache.UpdateMetadataPartitionState`). The `AdminManager` class has access to the `MetadataCache` and will use this to construct a reverse map of `active broker ID → number of partitions`.
...
TODO: How to pass max.broker.partitions and max.partitions values to all the relevant methods documented below.
TODO: Figure out if we want to apply these limits on internal topics.
TODO: Figure out how to impose these limits when running commands directly against ZooKeeper without going through the Kafka API.
The following table shows the list of methods that will need to change in order to support the max.broker.partitions and max.partitions configurations.
Method name | Relevant methods which directly depend on this one | Relevant methods on which this one is directly dependent | Description of what the method does currently | Change required |
---|---|---|---|---|
`AdminUtils.assignReplicasToBrokers` | `AdminZkClient.createTopic` `AdminZkClient.addPartitions` `AdminManager.createTopics` `ReassignPartitionsCommand.generateAssignment` |
| ||
`AdminZkClient.createTopicWithAssignment` | `AdminZkClient.createTopic` `AdminManager.createTopics` `ZookeeperTopicService.createTopic` | Creates the ZooKeeper znodes required for topic-specific configuration and replica assignments for the partitions of the topic. | ||
`AdminZkClient.createTopic` | `KafkaApis.createTopic` `ZookeeperTopicService.createTopic` | `AdminUtils.assignReplicasToBrokers` `AdminZkClient.createTopicWithAssignment` | Computes replica assignment using `AdminUtils.assignReplicasToBrokers` and then reuses `AdminZkClient.createTopicWithAssignment`. | |
`AdminZkClient.addPartitions` | `AdminManager.createPartitions` `ZookeeperTopicService.alterTopic` | `AdminUtils.assignReplicasToBrokers` |
| |
`AdminManager.createTopics` | `KafkaApis.handleCreateTopicsRequest` | `AdminUtils.assignReplicasToBrokers` `AdminZkClient.createTopicWithAssignment` |
| |
`AdminManager.createPartitions` | `KafkaApis.handleCreatePartitionsRequest` | `AdminZkClient.addPartitions` | Used exclusively by `KafkaApis.handleCreatePartitionsRequest` to create partitions on an existing topic. | |
`KafkaController.onPartitionReassignment` | `KafkaApis.handleAlterPartitionReassignmentsRequest` (not quite directly, but the stack trace in the middle is not relevant) | Handles all the modifications required on ZooKeeper znodes and sending API requests required for moving partitions from some brokers to others. | ||
`KafkaApis.handleCreateTopicsRequest` | `AdminManager.createTopics` | Handles the CreateTopics API request sent to a broker, if that broker is the controller. | ||
`KafkaApis.handleCreatePartitionsRequest` | `AdminManager.createPartitions` | Handles the CreatePartitions API request sent to a broker, if that broker is the controller. | ||
`KafkaApis.handleAlterPartitionReassignmentsRequest` | `KafkaController.onPartitionReassignment` (not quite directly, but the stack trace in the middle is not relevant) | Handles the AlterPartitionReassignments API request sent to a broker, if that broker is the controller. | ||
`KafkaApis.createTopic` | `AdminZkClient.createTopic` |
| ||
`ZookeeperTopicService.createTopic` | `AdminZkClient.createTopic` `AdminZkClient.createTopicWithAssignment` |
| ||
`ZookeeperTopicService.alterTopic` | `AdminZkClient.addPartitions` |
| ||
`ReassignPartitionsCommand.generateAssignment` | `AdminUtils.assignReplicasToBrokers` | Used by the ./kafka-reassign-partitions.sh admin tool to generate a replica assignment of partitions for the specified topics onto the set of specified brokers. |
Compatibility, Deprecation, and Migration Plan
...