Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Author: Gokul Ramanan Subramanian

Contributors: Stanislav Kozlovski, Alexandre Dupriez, Tom Bentley, Colin McCabe, Ismael Juma, Boyang Chen, Stanislav Kozlovski

Status

Current state: Voting

...

We did some performance experiments to understand the effect of increasing the number of partitions. See Appendix A1 for producer performance, and A2 for topic creation and deletion times. These consistently indicate that having a large number of partitions can lead to a malfunctioning cluster.

Topic creation policy plugins specified via the create.topic.policy.class.name configuration can partially help solve this problem by rejecting requests that result in a large number of partitions. However, these policies cannot produce a replica assignment that respects the partitions limits, instead they can only either accept or reject a request. Therefore, we need a more native solution for addressing the problem of partition limit-aware replica assignment. (See rejected alternatives for more details on why the policy approach does not work.)

We propose having two In order to prevent a cluster from entering a bad state due to a large number of topic partitions, we propose having two configurations (a) max.broker.partitions to limit the number of partitions per broker, and (b) max.partitions to limit the number of partitions in the cluster overall. These can act as guard rails ensuring that the cluster is never operating with a higher number of partitions than it can handle.

...

  • These limits are cluster-wide. This is obviously true for max.partitions which is meant to apply at the cluster level. However, we choose this for max.broker.partitions too, instead of supporting different values for each broker. This is in alignment with the current recommendation to run homogenous Kafka clusters where all brokers have the same specifications (CPU, RAM, disk etc.).
  • If both limits max.partitions and max.broker.partitions are specified, then the more restrictive of the two apply. It is possible that a request is rejected because it causes the max.partitions limit to be hit without causing any broker to hit the max.broker.partitions limit. The vice versa is true as well.
  • These limits can be changed at runtime, without restarting brokers. This provides greater flexibility. See the "Rejected alternatives" section for why we did not go with read-only configuration.
  • These limits also won't apply to topics created via auto topic creation (currently possible via Metadata API requests) . By enforcing this, we disallow having a backdoor to bypass these limitsuntil KIP-590. With KIP-590, auto-topic creation will leverage the CreateTopics API, and will have same behavior as the creation of any other topic.
  • These limits do not apply to internal topics (i.e. __consumer_offsets and __transaction_state), which usually are not configured with too many partitions. This ensures that any internal Kafka behaviors do not break because of partition limits. The topic partitions corresponding to these internal topics won't also count towards the limit.
  • These limits do not apply when creating topics or partitions, or reassigning partitions via the ZooKeeper-based admin tools. This is unfortunate, because it does create a backdoor to bypass these limits. However, we leave this out of scope here given that ZooKeeper will eventually be deprecated from Kafka.

...

Code Block
languagebash
./kafka-config.sh --bootstrap-server $ZOOKEEPER$SERVERS --alter --add-config max.broker.partitions=4000 --entity-type brokers --entity-default
./kafka-config.sh --bootstrap-server $ZOOKEEPER$SERVERS --alter --add-config max.partitions=200000 --entity-type brokers --entity-default

...

CreateTopics, CreatePartitions, and AlterPartitionReassignments and Metadata APIs will throw the following new exception type ResourceLimitReachedException if APIs will throw PolicyViolationException and correspondingly the POLICY_VIOLATION(44) error code if it is not possible to satisfy the request while respecting the max.broker.partitions or max.partitions limits. This applies to Metadata requests only in case auto-topic creation is enabled .

Code Block
languagejava
public class ResourceLimitReachedException extends ApiException { ... }

Corresponding to this exception, we will have the following API error code. The actual exception will contain the values of max.broker.partitions and max.partitions in order to make it easy for users to understand why their request got rejected.

Code Block
languagejava
RESOURCE_LIMIT_REACHED(89, "Cannot satisfy request without exceeding the partition limits", WillExceedPartitionLimitsException::new); 

These exception and error types will be reusable for other types of resource limits as well. The exception message should clearly indicate the type of resource limit that was hit, and why.

post KIP-590, which will modify the Metadata API to call CreateTopics. We will bump up the version of these APIs by one for new clients.

The actual exception will contain the values of max.broker.partitions and max.partitions in order to make it easy for users to understand why their request got rejectedCreate topic policy plugins can also (but are not required to) throw these exceptions in case they want to enforce limits on their own.

Proposed Changes

The following table shows the list of methods that will need to change in order to support the max.broker.partitions and max.partitions configurations. (We skip a few internal methods for the sake of simplicity.)

...

We will modify the core algorithm for replica assignment in the `AdminUtils.assignReplicasToBrokers` method. The modified algorithm will ensure that as replicas are being assigned to brokers iteratively one partition at a time, if assigning the next partition to a broker causes the broker to exceed the max.broker.partitions limit, then the broker is skipped. If all brokers are skipped successively in a row, then the algorithm will terminate and throw ResourceLimitReachedException PolicyViolationException. The check for max.partitions is much simpler and based purely on the total number of partitions that exist across all brokers.

...