Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

CreateTopics, CreatePartitions and AlterPartitionReassignments APIs, and admin tools kafka-topics.sh and kafka-reassign-partitions.sh will throw the following exception if it is not possible to satisfy the request while respecting the max.broker.partitions or max.partitions limits.

...

Code Block
languagejava
WILL_EXCEED_PARTITION_LIMITS(88, "Cannot satisfy request without exceeding the partition limits", WillExceedPartitionLimitsException::new); 

Proposed Changes

TODO: Add some more detail.

When creating a topic or adding partitions to a given topic, the controller currently assigns partitions to brokers such that a partition's replicas are on as many different racks as possible (see KIP-36). This algorithm is encapsulated in the method `AdminUtils.assignReplicasToBrokers`. We will modify this algorithm to ensure that the request fails if the number of resulting partitions in the cluster would exceed the max.partitions limit.

Further, we will modify this algorithm to ensure that brokers don't get assigned more than max.broker.partitions number of partitions.

The `AdminManager` class has access to the `KafkaConfig` object from where the controller can extract the max.broker.partitions and max.partitions configurations. 

Each broker (including the controller broker) keeps a cache of partition state (in `MetadataCache.UpdateMetadataPartitionState`). The `AdminManager` class has access to the `MetadataCache` and will use this to construct a reverse map of `active broker ID → number of partitions`.

...

TODO: How to pass max.broker.partitions and max.partitions values to all the relevant methods documented below.

TODO: Figure out if we want to apply these limits on internal topics.

TODO: Figure out how to impose these limits when running commands directly against ZooKeeper without going through the Kafka API.

The following table shows the list of methods that will need to change in order to support the max.broker.partitions and max.partitions configurations.

Method nameRelevant methods which directly depend on this oneRelevant methods on which this one is directly dependentDescription of what the method does currentlyChange required
`AdminUtils.assignReplicasToBrokers`

`AdminZkClient.createTopic`

`AdminZkClient.addPartitions`

`AdminManager.createTopics`

`ReassignPartitionsCommand.generateAssignment`


  • Encapsulates the algorithm specified in KIP-36 to assign partitions to brokers on as many racks as possible. This also handles the case when rack-awareness is disabled.
  • This is a pure function without any state or side effects.

`AdminZkClient.createTopicWithAssignment`

`AdminZkClient.createTopic`

`AdminManager.createTopics`

`ZookeeperTopicService.createTopic`


Creates the ZooKeeper znodes required for topic-specific configuration and replica assignments for the partitions of the topic.
`AdminZkClient.createTopic`

`KafkaApis.createTopic`

`ZookeeperTopicService.createTopic`

`AdminUtils.assignReplicasToBrokers`

`AdminZkClient.createTopicWithAssignment`

Computes replica assignment using `AdminUtils.assignReplicasToBrokers` and then reuses `AdminZkClient.createTopicWithAssignment`.
`AdminZkClient.addPartitions`

`AdminManager.createPartitions`

`ZookeeperTopicService.alterTopic`

`AdminUtils.assignReplicasToBrokers`
  • Computes replica assignment using `AdminUtils.assignReplicasToBrokers` when replica assignments are not specified.
  • When replica assignments are specified, uses them as is.
  • Creates the ZooKeeper znodes required for the new partitions with the corresponding replica assignments.

`AdminManager.createTopics``KafkaApis.handleCreateTopicsRequest`

`AdminUtils.assignReplicasToBrokers`

`AdminZkClient.createTopicWithAssignment`

  • Used exclusively by `KafkaApis.handleCreateTopicsRequest` to create topics.
  • Reuses `AdminUtils.assignReplicasToBrokers` when replica assignments are not specified.
  • When replica assignments are specified, uses them as is.

`AdminManager.createPartitions``KafkaApis.handleCreatePartitionsRequest``AdminZkClient.addPartitions`Used exclusively by `KafkaApis.handleCreatePartitionsRequest` to create partitions on an existing topic.
`KafkaController.onPartitionReassignment`

`KafkaApis.handleAlterPartitionReassignmentsRequest`

(not quite directly, but the stack trace in the middle is not relevant)


Handles all the modifications required on ZooKeeper znodes and sending API requests required for moving partitions from some brokers to others.
`KafkaApis.handleCreateTopicsRequest`
`AdminManager.createTopics`Handles the CreateTopics API request sent to a broker, if that broker is the controller.
`KafkaApis.handleCreatePartitionsRequest`
`AdminManager.createPartitions`Handles the CreatePartitions API request sent to a broker, if that broker is the controller.
`KafkaApis.handleAlterPartitionReassignmentsRequest`

`KafkaController.onPartitionReassignment`

(not quite directly, but the stack trace in the middle is not relevant)

Handles the AlterPartitionReassignments API request sent to a broker, if that broker is the controller.
`KafkaApis.createTopic`
`AdminZkClient.createTopic`
  • Creates internal topics for storing consumer offsets (__consumer_offsets), and transaction state (__transaction_state).
  • Also used to auto-create topics when topic auto-creation is enabled.

`ZookeeperTopicService.createTopic`

`AdminZkClient.createTopic`

`AdminZkClient.createTopicWithAssignment`

  • Used by the ./kafka-topics.sh admin tool to create topics when --zookeeper is specified.
  • Reuses `AdminZkClient.createTopic` when no replica assignments are specified.
  • Reuses `AdminZkClient.createTopicWithAssignment` when replica assignments are specified.

`ZookeeperTopicService.alterTopic`
`AdminZkClient.addPartitions`
  • Used by the ./kafka-topics.sh admin tool to alter topics when --zookeeper is specified.
  • Calls `AdminZkClient.addPartitions` if topic alteration involves a different number of partitions than what the topic currently has.

`ReassignPartitionsCommand.generateAssignment`
`AdminUtils.assignReplicasToBrokers`Used by the ./kafka-reassign-partitions.sh admin tool to generate a replica assignment of partitions for the specified topics onto the set of specified brokers.

Compatibility, Deprecation, and Migration Plan

...