Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We propose to introduce a second-level quota for the `CreateTopicsRequest`, the `CreatePartitionsRequest` and the `DeleteTopicsRequests` in the Kafka API layer that limits the number of partition mutations allowed per second. The number of partition mutations is defined as the number of partitions of a topic to be created, or the number of partitions to be added to an existing topic, or the number of partitions of a topic to be deleted.  We We chose to express the quota in partition mutations because the load in the controller is highly correlated to the number of created or deleted partitions.

The broker hosting the controller receives all the above request (already the case today) and can accept partition mutations with the rate R partitions per second and the maximum burst worth of B partitions. If a client sends a request with N topics, topics All the operations worth of N <= B partitions partition mutations are accepted until B the burst is exhausted, all . All the remaining are rejected with a `QUOTA_VIOLATED` error. Once the client dipped into the burst, it has to wait for R * <number of partitions of smallest remaining topic>The complete algorithm is formalised bellow.

The reason to explicitly defines a burst B is to allow a burst of several mutations with a small or medium number of partitions within the same request. Typically, applications tend to send one request with all the topics when they start. This would not work without an explicit burst.

Throttling Algorithm

We propose to use a variation of the Token Bucket algorithm to achieve this behavior:

...

A partition mutation with N partitions is admitted iff K > = N0, and updates K = K - N. The partition mutation request is rejected otherwise. The variation here is that we allow the number of tokens to become negative to accommodate any number of partitions for a topic without restricting it.

If the number of tokens decreases below 0 (K <= 0), a new operation with N partition mutations has to wait for R * ( -K + N ).

Partition Mutations Quota Manager

...

  • throttle_time_ms = max(<request throttle time>, min(<throttle time of each rejected topics in the request>) - <waiting time in the purgatory if applied>)

The channel will be mutated as well when `throttle_time_ms > 0`. This is necessary to cope with nasty misbehaving clients which would not honour the throttling time. This logic already exist and is used by the request quota thus we will reuse it.

Handling of old Clients

For old clients, we can't gracefully reject any topics requests. Therefore, we propose to always accept them and to throttle the client by muting is channel. The throttle time will be computed at the time the client should have waited for. The `throttle_time_ms` field will be populated accordingly.

Co-Existence of Multiple Quotas

The new partition mutations quota can be used in conjunction with the existing request quota. When both are used, the client will be throttled by the most constraining of the two.

Changes to Kafka Admin Client

...

To complement this default behavior, we propose to add a new configuration to the admin client to allow users to opt-out from the automatic retry mechanism. It allows application to have fine grained control.

...

TODO

Public Interfaces

Protocol

...

Starting from the bumped version, the new `QUOTA_VIOLATED` error will be used. It won't be used for older versions.

Configuration

...

We propose the introduce the following new configurations in the Kafka Broker:

NameTypeDefaultUpdate ModeDescription
quota.partition.mutations

...

.burstLongLong.MaxValuecluster-wideThe maximum burst of partition mutations allowed at any given second.
quota

...

.partition.mutations

...

.rateLongLong.MaxValuecluster-wide

The rate at which partition mutations are accepted for the create topics request, the create partitions request and the delete topics request.

We propose the introduce the following new configuration in the Admin Client:

NameTypeDefaultDescription
retry.quota.violation.

...

exceptionBooleantrueWhether the `QuotaViolationException` must be automatically retried or not.

Metrics

...

We propose to expose the following new metric in the Kafka Broker:

GroupNameDescription
PartitionMutationsQuotaManagerremaining-tokensThe number of remaining tokens in the bucket. <= 0 indicates that partition mutations are throttled. 

Public API

As mentioned, we propose to introduce a new retryable `QuotaViolatedException` exception which will be given back to the called when a topic is rejected due to throttling.

Code Block
languagejava
linenumberstrue
/**
 * Exception thrown if an operation on a resource violate the quota.
 */
public class QuotaViolationException extends RetryableException {
	private int throttleTimeMs; 
	
    public QuotaViolationException(int throttleTimeMs, String message) {
        super(message);
		this.throttleTimeMs = throttleTimeMs;
    }

    public QuotaViolationException(int throttleTimeMs, String message, Throwable cause) {
        super(message, cause);
		this.throttleTimeMs = throttleTimeMs;
    }

	public int throttleTimeMs() {
		return this.throttleTimeMs;
    }
}

Limitations

  • The proposal do not support the old ways to alter topics via Zookeeper as it is not possible to control nor reject changes made via Zookeeper. As these ways are deprecated anyway, we felt that it is an acceptable limitation.
  • Similarly, the proposal is not compatible with the usage of the auto topic creation for the same reasons.

Compatibility, Deprecation, and Migration Plan

autotopic creations

Compatibility with Old Clients

  • By default, there is no impact on existing users since the throttling must be enabled.
  • If the throttling is enabled in a cluster used by old clients, the old clients will be transparently throttled. If the client is throttled, its request will timeout and thus will be retry by the existing mechanism. If the retries fail, the application will receive a TimeoutException which is a retryable exception. Handling TimeoutException and more generally retryable Exceptions is something that clients should already do thus impact is minimal. 

Compatibility after Upgrading the Admin Client to the new version

  • By default, the upgrade should be transparent since the Admin Client will automatically retry QuotaViolationException and return it to the caller only if the retry timeout is reached. In this case, the caller must at minimum handle the RetryableException and retry. Handling retryable Exceptions is something that we can safely expect from clients.
  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.