Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • `CreateTopicsRequest` and `CreateTopicsResponse` to version 6.
  • `CreatePartitionsRequest` and `CreatePartitionsResponse` to version 3.
  • `DeleteTopicRequest` and `DeleteTopicResponse` to version 5.

Starting from the bumped version, the new `QUOTA_VIOLATED` error will be used. It won't be used for older versions.

We will add the `ErrorMessage` field in the `DeleteTopicResponse` as follow:

Code Block
languagejs
linenumberstrue
{
  "apiKey": 20,
  "type": "response",
  "name": "DeleteTopicsResponse",
  // Version 1 adds the throttle time.
  //
  // Starting in version 2, on quota violation, brokers send out responses before throttling.
  //
  // Starting in version 3, a TOPIC_DELETION_DISABLED error code may be returned.
  //
  // Version 4 is the first flexible version.
  //
  // Version 5 adds the ErrorMessage field.
  "validVersions": "0-5",
  "flexibleVersions": "4+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]DeletableTopicResult", "versions": "0+",
      "about": "The results for each topic we tried to delete.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
        "about": "The topic name" },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The deletion error, or 0 if the deletion succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "5+", "nullableVersions": "5+", "ignorable": true,
        "about": "The error message, or null if there was no error." }
    ]}
  ]
}

Configuration

We propose the introduce the following new configurations in the Kafka Broker:

NameTypeDefaultUpdate ModeDescription
quota.partition.mutations.burstLongLong.MaxValuecluster-wideThe maximum burst of partition mutations allowed at any given second.
quota.partition.mutations.rateLongLong.MaxValuecluster-wide

The rate at which partition mutations are accepted for the create topics request, the create partitions request and the delete topics request.

We propose the introduce the following new configuration in the Admin Client:

NameTypeDefaultDescription
retry.quota.violation.exceptionBooleantrueWhether the `QuotaViolationException` must be automatically retried or not.

Metrics

We propose to expose the following new metric in the Kafka Broker:

GroupNameDescription
PartitionMutationsQuotaManagerremaining-tokensThe number of remaining tokens in the bucket. <= 0 indicates that partition mutations are throttled. 

Public API

As mentioned, we propose to introduce a new retryable `QuotaViolatedException` exception which will be given back to the called when a topic is rejected due to throttling.

Code Block
languagejava
linenumberstrue
/**
 * Exception thrown if an operation on a resource violate the quota.
 */
public class QuotaViolationException extends RetryableException {
	private int throttleTimeMs; 
	
    public QuotaViolationException(int throttleTimeMs, String message) {
        super(message);
		this.throttleTimeMs = throttleTimeMs;
    }

    public QuotaViolationException(int throttleTimeMs, String message, Throwable cause) {
        super(message, cause);
		this.throttleTimeMs = throttleTimeMs;
    }

	public int throttleTimeMs() {
		return this.throttleTimeMs;
    }
}

Kafka Topic Command

We propose to disable the automatic try of the QuotaViolatedException for the `kafka-topics.sh` command in order to not have the command blocked until the retry period is exhausted. 

Known Limitations

  • The proposal do not support the old ways to alter topics via Zookeeper as it is not possible to control nor reject changes made via Zookeeper. As these ways are deprecated anyway, we felt that it is an acceptable limitation.
  • Similarly, the proposal is not compatible with the usage of the auto topic creation for the same reasons.

...

  • By default, the upgrade should be transparent since the Admin Client will automatically retry QuotaViolationException and return it to the caller only if the retry timeout is reached. In this case, the caller must at minimum handle the RetryableException and retry. Handling retryable Exceptions is something that we can safely expect from clients.

Rejected Alternatives

Throttle the Execution instead of the Admission

The major diametrically opposed alternative consists in throttling the internal execution in the controller of the requests instead of throttling and rejecting them during their admission in the API layer. Requests would already be accepted.

While this alternative would benefit from having zero client side changes, we believe that it is a sub-optimal solution for the clients for the following reasons:

  • By default, clients use a 30s request timeout meaning that throttling the execution for longer period would result in TimeoutException in the client which would automatically retry them. As its stand, the retry request would be rejected with a TopicAlreadyExistException which is weird. This is due to the fact that topics are registered in Zookeeper to notify the controller and thus are already created. Even without this, it feels wrong with our current API. This could work if we would change our API to be asynchronous. For instance, we could provide an API which allows clients to get the status of a topic (e.g. creating, ready, deleting, etc.) and continue to use the existing API to only trigger the creation. This has its own compatibility challenges.
  • Similarly, when users use the `kafka-topics.sh` tool, they would be blocked until the retry time is exhausted and also end up with a weird message.

Throttle the Admission but without en explicit Error code

Another alternative would be to method used for old clients for the new clients as well. We believe that it is a sub-optimal long term solution since having an explicit error code and message is betterIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.