Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The broker hosting the controller receives all the above request (already the case today) and can accept partition mutations with the rate R partitions per second and the maximum burst worth of B partitions. All the operations worth of N partition mutations are accepted until the burst is exhausted. All the remaining are rejected with a `QUOTAa `THROTTLING_QUOTA_VIOLATED` EXCEEDED` error. The complete algorithm is formalised bellow.

...

Handling of new Clients

For new clients, the new ``THROTTLING_QUOTA_VIOLATED` EXCEEDED` error code will be used to inform client that a topic has been rejected. The `throttle_time_ms` field is used to inform the client about how long it has been throttled:

...

Public Interfaces

Protocol

In order to be able to distinguish the new clients from the old ones, we will bump to version of the following requests/responses:

  • `CreateTopicsRequest` and `CreateTopicsResponse` to version 6.
  • `CreatePartitionsRequest` and `CreatePartitionsResponse` to version 3.
  • `DeleteTopicRequest` and `DeleteTopicResponse` to version 5.

Starting from the bumped version, the new `QUOTA_VIOLATED` error will be used. It won't be used for older versions.

We will add the `ErrorMessage` field in the `DeleteTopicResponse` as follow:

The following new error code will be introduced:

  • THROTTLING_QUOTA_EXCEEDED: It will be used when the defined throttling quota is exceeded but only for the above RPCs.

While discussing the naming of the error. We have also thought about using LIMIT_QUOTA_EXCEEDED in the future when a limit quota is exceeded. A limit is final error whereas a throttling error can be retried.

In order to be able to distinguish the new clients from the old ones, we will bump to version of the following requests/responses:

  • `CreateTopicsRequest` and `CreateTopicsResponse` to version 6.
  • `CreatePartitionsRequest` and `CreatePartitionsResponse` to version 3.
  • `DeleteTopicRequest` and `DeleteTopicResponse` to version 5.

Starting from the bumped version, the new `THROTTLING_QUOTA_EXCEEDED` error will be used. It won't be used for older versions.

We will add the `ErrorMessage` field in the `DeleteTopicResponse` as follow:

Code Block
languagejs
linenumberstrue
{
  "apiKey": 20,
Code Block
languagejs
linenumberstrue
{
  "apiKey": 20,
  "type": "response",
  "name": "DeleteTopicsResponse",
  // Version 1 adds the throttle time.
  //
  // Starting in version 2, on quota violation, brokers send out responses before throttling.
  //
  // Starting in version 3, a TOPIC_DELETION_DISABLED error code may be returned.
  //
  // Version 4 is the first flexible version.
  //
  // Version 5 adds the ErrorMessage field.
  "validVersions": "0-5",
  "flexibleVersions": "4+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]DeletableTopicResult", "versions": "0+",
      "about": "The results for each topic we tried to delete.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
        "about": "The topic name" },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The deletion error, or 0 if the deletion succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "5+", "nullableVersions": "5+", "ignorable": true,
        "about": "The error message, or null if there was no error." }
    ]}
  ]
}

...

They will be supported for <client-id>, <user> and <user, client-id> similar to the existing quotas. Defaults can be configured using the dynamic default properties at <client-id>, <user> and <user, client-id> , <user> and <user, client-id> levels.* We keep the name intentionally generic to allow us to extend their coverage in the future. levels.

* We keep the name intentionally generic to allow us to extend their coverage in the future.

The new name works similarly to the already existing quota. It can be set or changed by using the `kafka-configs.sh` tool.

For instance, the above command defines a quota of 10 controller mutations per secs for the (user=user1, client-id=clientA):

Code Block
languagebash
> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'controller_mutations_rate=10' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Updated config for entity: user-principal 'user1', client-id 'clientA'.

The above commands list the quota of (user=user1, client-id=clientA):

Code Block
languagebash
> bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Configs for user-principal 'user1', client-id 'clientA' are controller_mutations_rate=10,... (other quotas are listed as well)

Broker Metrics

We propose to expose the following new metric in the Kafka Broker:

...

As mentioned, we propose to introduce a new retryable `QuotaViolatedException` exception which will be given back to the caller when a topic is rejected due to throttling. The new exception maps to the new `QUOTA`THROTTLING_VIOLATEDQUOTA_ERROR` EXCEEDED` error.

Code Block
languagejava
linenumberstrue
/**
 * Exception thrown if an operation on a resource violateexceeds the throttling quota.
 */
public class QuotaViolationExceptionThrottlineQuotaExceededException extends RetryableException {
	private int throttleTimeMs; 
	
    public QuotaViolationException(int throttleTimeMs, String message) {
        super(message);
		this.throttleTimeMs = throttleTimeMs;
    }

    public QuotaViolationException(int throttleTimeMs, String message, Throwable cause) {
        super(message, cause);
		this.throttleTimeMs = throttleTimeMs;
    }

	public int throttleTimeMs() {
		return this.throttleTimeMs;
    }
}

...