Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We did some performance experiments to understand the effect of increasing the number of partitions. See Appendix A1 for producer performance, and A2 for topic creation and deletion times. These consistently indicate that having a large number of partitions can lead to a malfunctioning cluster.

In order to prevent a cluster from entering a bad state due to a large number of topic partitions, we propose having two configurations (a) max.broker.partitions to limit the number of partitions per broker, and (b) max.partitions to limit the number of partitions in the cluster overall. These can act as guard rails ensuring that the cluster is never operating with a higher number of partitions than it can handle.

...

The following table shows the list of methods that will need to change in order to support the max.broker.partitions and max.partitions configurations. (We skip a few internal methods for the sake of simplicity.)

Method nameRelevant methods which directly depend on this oneRelevant methods on which this one is directly dependentDescription of what the method does currentlyContext in which used
`AdminUtils.assignReplicasToBrokers`

`AdminZkClient.createTopic`

`AdminZkClient.addPartitions`

`AdminManager.createTopics`

`ReassignPartitionsCommand.generateAssignment`


  • Encapsulates the algorithm specified in KIP-36 to assign partitions to brokers on as many racks as possible. This also handles the case when rack-awareness is disabled.
  • This is a pure function without any state or side effects.
  • API
  • ZooKeeper-based admin tools
`AdminZkClient.createTopicWithAssignment`

`AdminZkClient.createTopic`

`AdminManager.createTopics`

`ZookeeperTopicService.createTopic`


Creates the ZooKeeper znodes required for topic-specific configuration and replica assignments for the partitions of the topic.
  • API
  • ZooKeeper-based admin tools
`AdminZkClient.createTopic`

`KafkaApis.createTopic`

`ZookeeperTopicService.createTopic`

`AdminUtils.assignReplicasToBrokers`

`AdminZkClient.createTopicWithAssignment`

Computes replica assignment using `AdminUtils.assignReplicasToBrokers` and then reuses `AdminZkClient.createTopicWithAssignment`.
  • API
  • ZooKeeper-based admin tools
`AdminZkClient.addPartitions`

`AdminManager.createPartitions`

`ZookeeperTopicService.alterTopic`

`AdminUtils.assignReplicasToBrokers`
  • Computes replica assignment using `AdminUtils.assignReplicasToBrokers` when replica assignments are not specified.
  • When replica assignments are specified, uses them as is.
  • Creates the ZooKeeper znodes required for the new partitions with the corresponding replica assignments.
  • API
  • ZooKeeper-based admin tools
`AdminManager.createTopics``KafkaApis.handleCreateTopicsRequest`

`AdminUtils.assignReplicasToBrokers`

`AdminZkClient.createTopicWithAssignment`

  • Used exclusively by `KafkaApis.handleCreateTopicsRequest` to create topics.
  • Reuses `AdminUtils.assignReplicasToBrokers` when replica assignments are not specified.
  • When replica assignments are specified, uses them as is.
  • API
`AdminManager.createPartitions``KafkaApis.handleCreatePartitionsRequest``AdminZkClient.addPartitions`Used exclusively by `KafkaApis.handleCreatePartitionsRequest` to create partitions on an existing topic.
  • API
`KafkaController.onPartitionReassignment`

`KafkaApis.handleAlterPartitionReassignmentsRequest`

(not quite directly, but the stack trace in the middle is not relevant)


Handles all the modifications required on ZooKeeper znodes and sending API requests required for moving partitions from some brokers to others.
  • API
`KafkaApis.handleCreateTopicsRequest`
`AdminManager.createTopics`Handles the CreateTopics API request sent to a broker, if that broker is the controller.
  • API
`KafkaApis.handleCreatePartitionsRequest`
`AdminManager.createPartitions`Handles the CreatePartitions API request sent to a broker, if that broker is the controller.
  • API
`KafkaApis.handleAlterPartitionReassignmentsRequest`

`KafkaController.onPartitionReassignment`

(not quite directly, but the stack trace in the middle is not relevant)

Handles the AlterPartitionReassignments API request sent to a broker, if that broker is the controller.
  • API
`KafkaApis.createTopic`

`KafkaApis.handleTopicMetadataRequest`

`KafkaApis.handleFindCoordinatorRequest`

(not quite directly, but the stack trace in the middle is not relevant)

`AdminZkClient.createTopic`
  • Creates internal topics for storing consumer offsets (__consumer_offsets), and transaction state (__transaction_state).
  • Also used to auto-create topics when topic auto-creation is enabled.
  • API
`KafkaApis.handleTopicMetadataRequest`

`KafkaApis.createTopic`

(not quite directly, but the stack trace in the middle is not relevant)

Handles the Metadata API request sent to a broker.
  • API
`KafkaApis.handleFindCoordinatorRequest`

`KafkaApis.createTopic`

(not quite directly, but the stack trace in the middle is not relevant)

Handles the FindCoordinator API request sent to a broker.
  • API
`ZookeeperTopicService.createTopic`

`AdminZkClient.createTopic`

`AdminZkClient.createTopicWithAssignment`

  • Used by the ./kafka-topics.sh admin tool to create topics when --zookeeper is specified.
  • Reuses `AdminZkClient.createTopic` when no replica assignments are specified.
  • Reuses `AdminZkClient.createTopicWithAssignment` when replica assignments are specified.
  • ZooKeeper-based admin tools
`ZookeeperTopicService.alterTopic`
`AdminZkClient.addPartitions`
  • Used by the ./kafka-topics.sh admin tool to alter topics when --zookeeper is specified.
  • Calls `AdminZkClient.addPartitions` if topic alteration involves a different number of partitions than what the topic currently has.
  • ZooKeeper-based admin tools
`ReassignPartitionsCommand.generateAssignment`
`AdminUtils.assignReplicasToBrokers`Used by the ./kafka-reassign-partitions.sh admin tool to generate a replica assignment of partitions for the specified topics onto the set of specified brokers.
  • ZooKeeper-based admin tools

For all the methods in the above table that are used in the context of both Kafka API request handling paths and ZooKeeper-based admin tools (`AdminUtils.assignReplicasToBrokers`, `AdminZkClient.createTopicWithAssignment`,  `AdminZkClient.createTopic` and `AdminZkClient.addPartitions`), we will pass the values for maximum number of partitions per broker, maximum number of partitions overall, and the current number of partitions for each broker as arguments.

...

We can see that leadership resignation times are exponential in the number of partitions.

Leadership resignation time (minutes)
Number of partitions left
3000020000100005000100010010
1004393< 1< 1< 1