Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

PRhttps://github.com/apache/kafka/pull/8499 (currently only prototype)

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  • These limits are cluster-wide. This is obviously true for max.partitions which is meant to apply at the cluster level. However, we choose this for max.broker.partitions too, instead of supporting different values for each broker. This is in alignment with the current recommendation to run homogenous Kafka clusters where all brokers have the same specifications (CPU, RAM, disk etc.).
  • If both limits max.partitions and max.broker.partitions are specified, then the more restrictive of the two apply. It is possible that a request is rejected because it causes the max.partitions limit to be hit without causing any broker to hit the max.broker.partitions limit. The vice versa is true as well.
  • These limits can be changed at runtime, without restarting brokers. This provides greater flexibility. See the "Rejected alternatives" section for why we did not go with read-only configuration.
  • These limits also apply to topics created via auto topic creation (currently possible via Metadata API requests). By enforcing this, we disallow having a backdoor to bypass these limits.
  • These limits do not apply to all topics, even internal topics (i.e. __consumer_offsets and __transaction_state), which usually are not configured with too many partitions). This provides the same consistent experience across all topics and partitions.These limits also apply to topics created via auto topic creation (currently possible via the Metadata and FindCoordinator API requests). By enforcing this, we disallow having a backdoor to bypass these configured with too many partitions. This ensures that any internal Kafka behaviors do not break because of partition limits.
  • These limits do not apply when creating topics or partitions, or reassigning partitions via the ZooKeeper-based admin tools. This is unfortunate, because it does create a backdoor to bypass these limits. However, we leave this out of scope here given that ZooKeeper will eventually be deprecated from Kafka.

...

CreateTopics, CreatePartitions, AlterPartitionReassignments , Metadata and  FindCoordinator APIs Metadata APIs will throw the following exception if it is not possible to satisfy the request while respecting the max.broker.partitions or max.partitions limits. This applies to Metadata requests only in case auto-topic creation is enabled. This applies to FindCoordinator requests only in case of creating internal topics (__consumer_offsets and __transaction_state) Metadata requests only in case auto-topic creation is enabled.

Code Block
languagejava
public class WillExceedPartitionLimitsException extends ApiException { ... }

...

Method nameDescription of what the method does currentlyContext in which usedRelevant methods which directly depend on this oneRelevant methods on which this one is directly dependent
`AdminUtils.assignReplicasToBrokers`
  • Encapsulates the algorithm specified in KIP-36 to assign partitions to brokers on as many racks as possible. This also handles the case when rack-awareness is disabled.
  • This is a pure function without any state or side effects.
  • API
  • ZooKeeper-based admin tools

`AdminZkClient.createTopic`

`AdminZkClient.addPartitions`

`AdminManager.createTopics`

`ReassignPartitionsCommand.generateAssignment`


`AdminZkClient.createTopicWithAssignment`Creates the ZooKeeper znodes required for topic-specific configuration and replica assignments for the partitions of the topic.
  • API
  • ZooKeeper-based admin tools

`AdminZkClient.createTopic`

`AdminManager.createTopics`

`ZookeeperTopicService.createTopic`


`AdminZkClient.createTopic`Computes replica assignment using `AdminUtils.assignReplicasToBrokers` and then reuses `AdminZkClient.createTopicWithAssignment`.
  • API
  • ZooKeeper-based admin tools

`KafkaApis.createTopic`

`ZookeeperTopicService.createTopic`

`AdminUtils.assignReplicasToBrokers`

`AdminZkClient.createTopicWithAssignment`

`AdminZkClient.addPartitions`
  • Computes replica assignment using `AdminUtils.assignReplicasToBrokers` when replica assignments are not specified.
  • When replica assignments are specified, uses them as is.
  • Creates the ZooKeeper znodes required for the new partitions with the corresponding replica assignments.
  • API
  • ZooKeeper-based admin tools

`AdminManager.createPartitions`

`ZookeeperTopicService.alterTopic`

`AdminUtils.assignReplicasToBrokers`
`AdminManager.createTopics`
  • Used exclusively by `KafkaApis.handleCreateTopicsRequest` to create topics.
  • Reuses `AdminUtils.assignReplicasToBrokers` when replica assignments are not specified.
  • When replica assignments are specified, uses them as is.
  • API
`KafkaApis.handleCreateTopicsRequest`

`AdminUtils.assignReplicasToBrokers`

`AdminZkClient.createTopicWithAssignment`

`AdminManager.createPartitions`Used exclusively by `KafkaApis.handleCreatePartitionsRequest` to create partitions on an existing topic.
  • API
`KafkaApis.handleCreatePartitionsRequest``AdminZkClient.addPartitions`
`KafkaController.onPartitionReassignment`Handles all the modifications required on ZooKeeper znodes and sending API requests required for moving partitions from some brokers to others.
  • API

`KafkaApis.handleAlterPartitionReassignmentsRequest`

(not quite directly, but the stack trace in the middle is not relevant)


`KafkaApis.handleCreateTopicsRequest`Handles the CreateTopics API request sent to a broker, if that broker is the controller.
  • API

`AdminManager.createTopics`
`KafkaApis.handleCreatePartitionsRequest`Handles the CreatePartitions API request sent to a broker, if that broker is the controller.
  • API

`AdminManager.createPartitions`
`KafkaApis.handleAlterPartitionReassignmentsRequest`Handles the AlterPartitionReassignments API request sent to a broker, if that broker is the controller.
  • API

`KafkaController.onPartitionReassignment`

(not quite directly, but the stack trace in the middle is not relevant)

`KafkaApis.createTopic`
  • Creates internal topics for storing consumer offsets (__consumer_offsets), and transaction state (__transaction_state).
  • Also used to auto-create topics when topic auto-creation is enabled.
  • API

`KafkaApis.handleTopicMetadataRequest``KafkaApis.handleFindCoordinatorRequest`

(not quite directly, but the stack trace in the middle is not relevant)

`AdminZkClient.createTopic`
`KafkaApis.handleTopicMetadataRequest`Handles the Metadata API request sent to a broker.
  • API

`KafkaApis.createTopic`

(not quite directly, but the stack trace in the middle is not relevant)

`KafkaApis.handleFindCoordinatorRequest`Handles the FindCoordinator API request sent to a broker.
  • API

`KafkaApis.createTopic`

(not quite directly, but the stack trace in the middle is not relevant)

`ZookeeperTopicService.createTopic`
  • Used by the ./kafka-topics.sh admin tool to create topics when --zookeeper is specified.
  • Reuses `AdminZkClient.createTopic` when no replica assignments are specified.
  • Reuses `AdminZkClient.createTopicWithAssignment` when replica assignments are specified.
  • ZooKeeper-based admin tools

`AdminZkClient.createTopic`

`AdminZkClient.createTopicWithAssignment`

`ZookeeperTopicService.alterTopic`
  • Used by the ./kafka-topics.sh admin tool to alter topics when --zookeeper is specified.
  • Calls `AdminZkClient.addPartitions` if topic alteration involves a different number of partitions than what the topic currently has.
  • ZooKeeper-based admin tools

`AdminZkClient.addPartitions`
`ReassignPartitionsCommand.generateAssignment`Used by the ./kafka-reassign-partitions.sh admin tool to generate a replica assignment of partitions for the specified topics onto the set of specified brokers.
  • ZooKeeper-based admin tools

`AdminUtils.assignReplicasToBrokers`

...