Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Author: Gokul Ramanan Subramanian

Contributors: Stanislav Kozlovski

Status

Current stateDraft

...

We did some performance experiments to understand the effect of increasing the number of partitions. See Appendix A1 that describes the impact on for producer performance. See Appendix A2 that describes impact on , A2 for topic creation and deletion times, and A3 for recovery time under controller failurebroker failure. These consistently indicate that having lots of partitions provides degraded performance for no additional gain.

In order to mitigate these issues, we propose having two configurations (a) max.broker.partitions to limit the number of partitions per broker, and (b) max.partitions to limit the number of partitions in the cluster overall.

Goals

These can act as guard rails ensuring that the cluster is never operating with a higher number of partitions than it can handle.

Goals

  • These limits are cluster-wide. This is obviously true for max.partitions. However, we choose this for max.broker.partitions too, instead of supporting different values for each broker, in keeping These limits are cluster-wide, not intended to be per-broker. This is in alignment with the current recommendation to run homogenous Kafka clusters where all brokers have the same specifications (CPU, RAM, disk etc.).
  • These limits can be changed at runtime, without restarting brokers. This provides greater flexibility as explained in the "Rejected alternatives" section.
  • These limits apply to all topics, even internal topics (which usually are not configured with too many partitions) for the sake of consistency.
  • These limits also apply to topics created via auto topic creation (currently possible via the Metadata and FindCoordinator API requests). If we don't do By enforcing this, then we have a simple back-door to bypass this check, which we'd like to avoiddisallow having a backdoor to bypass these limits.
  • These limits do not apply when creating topics or partitions, or reassigning partitions via the ZooKeeper-based admin tools. This is unfortunate, but it simplifies the design. Today, ZooKeeper-based tools don't have a clear way to know about these limits. We will not try to make ZooKeeper-based tools enforce these limits, in an attempt to stay in accordance with the general direction of removing ZooKeeper from because it does create a backdoor to bypass these limits. We choose this approach because it simplifies the design, and believe this is acceptable given that we will eventually deprecate ZooKeeper in Kafka.

Public Interfaces

Configs

...

CreateTopics, CreatePartitions, AlterPartitionReassignments, Metadata , FindCoordinator and AlterPartitionReassignments APIs  FindCoordinator APIs will throw the following exception if it is not possible to satisfy the request while respecting the max.broker.partitions or max.partitions limits. This applies to Metadata requests only in case auto-topic creation is enabled. This applies to FindCoordinator requests only in case of creating internal topics (__consumer_offsets and __transaction_state).

...

The following table shows the list of methods that will need to change in order to support the max.broker.partitions and max.partitions configurations. (We skip a few internal methods for the sake of simplicity.)

Method nameRelevant methods which directly depend on this oneRelevant methods on which this one is directly dependentDescription of what the method does currentlyContext in which used
`AdminUtils.assignReplicasToBrokers`

`AdminZkClient.createTopic`

`AdminZkClient.addPartitions`

`AdminManager.createTopics`

`ReassignPartitionsCommand.generateAssignment`


  • Encapsulates the algorithm specified in KIP-36 to assign partitions to brokers on as many racks as possible. This also handles the case when rack-awareness is disabled.
  • This is a pure function without any state or side effects.
  • API
  • ZooKeeper-based admin tools
`AdminZkClient.createTopicWithAssignment`

`AdminZkClient.createTopic`

`AdminManager.createTopics`

`ZookeeperTopicService.createTopic`


Creates the ZooKeeper znodes required for topic-specific configuration and replica assignments for the partitions of the topic.
  • API
  • ZooKeeper-based admin tools
`AdminZkClient.createTopic`

`KafkaApis.createTopic`

`ZookeeperTopicService.createTopic`

`AdminUtils.assignReplicasToBrokers`

`AdminZkClient.createTopicWithAssignment`

Computes replica assignment using `AdminUtils.assignReplicasToBrokers` and then reuses `AdminZkClient.createTopicWithAssignment`.
  • API
  • ZooKeeper-based admin tools
`AdminZkClient.addPartitions`

`AdminManager.createPartitions`

`ZookeeperTopicService.alterTopic`

`AdminUtils.assignReplicasToBrokers`
  • Computes replica assignment using `AdminUtils.assignReplicasToBrokers` when replica assignments are not specified.
  • When replica assignments are specified, uses them as is.
  • Creates the ZooKeeper znodes required for the new partitions with the corresponding replica assignments.
  • API
  • ZooKeeper-based admin tools
`AdminManager.createTopics``KafkaApis.handleCreateTopicsRequest`

`AdminUtils.assignReplicasToBrokers`

`AdminZkClient.createTopicWithAssignment`

  • Used exclusively by `KafkaApis.handleCreateTopicsRequest` to create topics.
  • Reuses `AdminUtils.assignReplicasToBrokers` when replica assignments are not specified.
  • When replica assignments are specified, uses them as is.
  • API
`AdminManager.createPartitions``KafkaApis.handleCreatePartitionsRequest``AdminZkClient.addPartitions`Used exclusively by `KafkaApis.handleCreatePartitionsRequest` to create partitions on an existing topic.
  • API
`KafkaController.onPartitionReassignment`

`KafkaApis.handleAlterPartitionReassignmentsRequest`

(not quite directly, but the stack trace in the middle is not relevant)


Handles all the modifications required on ZooKeeper znodes and sending API requests required for moving partitions from some brokers to others.
  • API
`KafkaApis.handleCreateTopicsRequest`
`AdminManager.createTopics`Handles the CreateTopics API request sent to a broker, if that broker is the controller.
  • API
`KafkaApis.handleCreatePartitionsRequest`
`AdminManager.createPartitions`Handles the CreatePartitions API request sent to a broker, if that broker is the controller.
  • API
`KafkaApis.handleAlterPartitionReassignmentsRequest`

`KafkaController.onPartitionReassignment`

(not quite directly, but the stack trace in the middle is not relevant)

Handles the AlterPartitionReassignments API request sent to a broker, if that broker is the controller.
  • API
`KafkaApis.createTopic`

`KafkaApis.handleTopicMetadataRequest`

`KafkaApis.handleFindCoordinatorRequest`

(not quite directly, but the stack trace in the middle is not relevant)

`AdminZkClient.createTopic`
  • Creates internal topics for storing consumer offsets (__consumer_offsets), and transaction state (__transaction_state).
  • Also used to auto-create topics when topic auto-creation is enabled.
  • API
`KafkaApis.handleTopicMetadataRequest`

`KafkaApis.createTopic`

(not quite directly, but the stack trace in the middle is not relevant)

Handles the Metadata API request sent to a broker.
  • API
`KafkaApis.handleFindCoordinatorRequest`

`KafkaApis.createTopic`

(not quite directly, but the stack trace in the middle is not relevant)

Handles the FindCoordinator API request sent to a broker.
  • API
`ZookeeperTopicService.createTopic`

`AdminZkClient.createTopic`

`AdminZkClient.createTopicWithAssignment`

  • Used by the ./kafka-topics.sh admin tool to create topics when --zookeeper is specified.
  • Reuses `AdminZkClient.createTopic` when no replica assignments are specified.
  • Reuses `AdminZkClient.createTopicWithAssignment` when replica assignments are specified.
  • ZooKeeper-based admin tools
`ZookeeperTopicService.alterTopic`
`AdminZkClient.addPartitions`
  • Used by the ./kafka-topics.sh admin tool to alter topics when --zookeeper is specified.
  • Calls `AdminZkClient.addPartitions` if topic alteration involves a different number of partitions than what the topic currently has.
  • ZooKeeper-based admin tools
`ReassignPartitionsCommand.generateAssignment`
`AdminUtils.assignReplicasToBrokers`Used by the ./kafka-reassign-partitions.sh admin tool to generate a replica assignment of partitions for the specified topics onto the set of specified brokers.
  • ZooKeeper-based admin tools

...

Produce throughput (Mbps)
ReplicationNumber of partitions on a broker

101005001000500010000
3-way5210186122.50.9
1-way142113104992416

We also stress tested the system with a single topic consisting of 30000 partitions each with 3-way replication. The producer performance test provided basically 0 throughput. Most requests returned NOT_ENOUGH_REPLICAS error, indicating that replication is backlogged, and brokers are also unable to handle the Produce requests. The UnderMinIsrPartitionCount was equal to the LeaderCount on all brokers, confirming this hypothesis.

A2: Topic creation and deletion time

For stress testing the system with a large number of partitions, we created a single topic with 30000 partitions with 3-way replication. It took 30 minutes for the necessary (30000) log directories to get created on the brokers after the point in time when the PartitionCount metric indicated that the brokers have 30000 partitions each.

In this case, deleting the topic with no Produce / Fetch traffic took 1 hour 40 minutes. As the above deletion happened, we noticed that once there were fewer partitions left to delete, the deletion process was happening more rapidly. The following table shows how long the last N partitions (out of the total 30000 partitions) took to delete.

We can see that deletion times are exponential in the number of partitions.

Deletion time (minutes)
Number of partitions to be deleted
3000020000100005000100010010
1004393< 1< 1< 1

The above data actually corresponds to the approximate deletion times as measured by the decrease in the LeaderCount metric. The PartitionCount metric did not come down at all from 30000. From the GC logs, we noticed that this was because all brokers had maxed out their allocated heap of 2GB, and the process was spending a lot of time in garbage collection, which caused the ZooKeeper sessions to repeatedly timeout.

A2: Recovery time under failure

...