Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Table of Contents

Status

Current stateUnder Discussion [One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9915

...

With this configuration, if a client sends a request with 7 topics with 80 partitions each at time T, worth a total of 560 mutations, it bring the average rate R to 5.6 (560 / 100). As the rate is above the defined quota, any subsequent mutations is rejected until the average rate gets back to the allowed quota. Currently, this is computed as follow for our current quotas: ((R - Q / Q * S * W)) = ((5.6 - 5) / 5 * 100 * 1) = 12 secs. In practice, the average rate won't go back until the samples at time T is discarded so any new mutations won't be accepted before S * W. 100 secs in our case.

To overcome this, we initially thought about using propose to use a variation of the Tokens Bucket Algorithm. The algorithm guarantee an average rate by accumulating tokens at the defined rate and allowing to spend the accumulated tokens to do an operations. The burst is defined by the algorithm. Let's define:

  • K: The number of tokens in the bucket
  • B: The maximum number of tokens

...

  • that can be accumulated

...

  • in the bucket (burst)
  • R: The rate at which we accumulate tokens
  • T: The last time K got updated

At the time now, we update the number of tokens in the bucket with:

  • K = min(K + (now - T) * R), B)

A partition mutation with N partitions is admitted iff K >= 0, and updates K = K - N. The partition mutation request is rejected otherwise. The variation here is that we allow the number of tokens to become negative to accommodate any number of partitions for a topic. The throttle time is defined with: -K * R.

In order to be able to reuse our existing configuration, the Tokens Bucket is configured based on Q, S and W as follow:

  • R = Q
  • B = Q * S * W

Our previous example would work as follow: R = 5, B = 500. The burst of 560 mutations brings the number of tokens to -60. The throttle time is -(-60) / 5 = 12s.

The Tokens Bucket will be implemented as a new `MeasurableStat` that will be used within a `Sensor` alongside the existing `Rate`. The quota will be enforced on the Token Bucket metric only. The `Sensor` will be updated the handle the verification of any Tokens Bucket metrics.

Controller Mutation Quota Manager

We propose to introduce a new quota manager called `ControllerMutationQuotaManager` which implements the algorithm described above. The new quota manager will be configured by the new quota types: `controller_mutation_rate`.

Handling of new Clients

For new clients, the new `THROTTLING_QUOTA_EXCEEDED` error code will be used to inform client that a topic has been rejected. The `throttle_time_ms` field is used to inform the client about how long it has been throttled:

  • throttle_time_ms = max(<request throttle time>, <controller throttle time> - <waiting time in the purgatory if applied>)

The channel will be muted as well when `throttle_time_ms > 0`. This is necessary to cope with misbehaving clients which would not honour the throttling time. This logic already exist and is used by the request quota thus we will reuse it.

Handling of old Clients

For old clients, we can't gracefully reject any requests. Therefore, we propose to always accept them and to throttle the client by muting is channel. The

We propose the update the implementation of our Rate metrics (when used in conjunction with a quota) to allocate the burst to all the previous window up to Q and to allocate the remaining part the the current window. The rational behind this is to allow the average rate to decrease regularly similarly to what the token bucket does. While not perfect, it allows to better accommodate bursty workloads. Let's look at our previous example with this change in mind. Instead of having one sample worth of 560, we will have 99 samples worth of 5 and one last sample worth of 65. The average rate is still 5.6 as the sum does not change. By waiting 12 secs (or discarding 12 samples) brings back the average rate to the define quota Q.

The rate is accumulated by the number of partitions N created or deleted.

Altogether, a mutation with N partitions is admitted iff R <= Q. The partition mutation request is rejected otherwise. The throttle time is defined with: (R - Q) / Q * S * W seconds.

Controller Mutations Quota Manager

We propose to introduce a new quota manager called `ControllerMutationsQuotaManager` which implements the algorithm described above. The new quota manager will be configured by the new quota types: `controller_mutations_rate` and `controller_mutations_burst`.

Handling of new Clients

For new clients, the new `THROTTLING_QUOTA_EXCEEDED` error code will be used to inform client that a topic has been rejected. The `throttle_time_ms` field is used to inform the client about how long it has been throttled:

...

The channel will be muted as well when `throttle_time_ms > 0`. This is necessary to cope with misbehaving clients which would not honour the throttling time. This logic already exist and is used by the request quota thus we will reuse it.

Handling of old Clients

For old clients, we can't gracefully reject any requests. Therefore, we propose to always accept them and to throttle the client by muting is channel. The `throttle_time_ms` field is used to inform the client about how long it has been throttled:

  • throttle_time_ms = max(<request throttle time>, <controller throttle time> - <waiting time in the purgatory if applied>)

The channel will be muted as well when `throttle_time_ms > 0`.

Co-Existence of Multiple Quotas

The new partition mutations quota can be used in conjunction with the existing request quota. When both are used, the client will be throttled by the most constraining of the two.

Handling of ValidateOnly

The CreateTopicsRequest and CreatePartitionsRequest offer the possibility to only validate their content by setting the flat `ValidateOnly`. As the validation does not have generate load on the controller, we plan to not apply the quota is this particular case. We will clarify this in the documentation.

Changes to Kafka Admin Client

Automatic Retry

A new retryable `QuotaViolatedException` will be introduce on the client side. By default, the admin client will retry this error until `default.api.timeout.ms` is reached and also honour the throttling time. We will extend the retry mechanism already in place to support this. Once `default.api.timeout.ms` has been reached, the topics which were throttled will return the `QuotaViolatedException` to the caller. Any other errors will be given back to the caller as today. As the new exception is a retryable exception, it should not break code of users when they upgrade as handling retryable exception is already something that they should do.

Manual Retry

To complement this default behavior, we propose to add a new configuration to the admin client to allow users to opt-out from the automatic retry mechanism. It allows application to have fine grained control.

Public Interfaces

Protocol

The following new error code will be introduced:

  • THROTTLING_QUOTA_EXCEEDED: It will be used when the defined throttling quota is exceeded but only for the above RPCs.

While discussing the naming of the error. We have also thought about using LIMIT_QUOTA_EXCEEDED in the future when a limit quota is exceeded. A limit is final error whereas a throttling error can be retried.

In order to be able to distinguish the new clients from the old ones, we will bump to version of the following requests/responses:

  • `CreateTopicsRequest` and `CreateTopicsResponse` to version 6.
  • `CreatePartitionsRequest` and `CreatePartitionsResponse` to version 3.
  • `DeleteTopicRequest` and `DeleteTopicResponse` to version 5.

Starting from the bumped version, the new `THROTTLING_QUOTA_EXCEEDED` error will be used. It won't be used for older versions.

We will add the `ErrorMessage` field in the `DeleteTopicResponse` as follow:

Co-Existence of Multiple Quotas

The new partition mutations quota can be used in conjunction with the existing request quota. When both are used, the client will be throttled by the most constraining of the two.

Handling of ValidateOnly

The CreateTopicsRequest and CreatePartitionsRequest offer the possibility to only validate their content by setting the flat `ValidateOnly`. As the validation does not have generate load on the controller, we plan to not apply the quota is this particular case. We will clarify this in the documentation.

Changes to Kafka Admin Client

Automatic Retry

A new retryable `QuotaViolatedException` will be introduce on the client side. By default, the admin client will retry this error until `default.api.timeout.ms` is reached and also honour the throttling time. We will extend the retry mechanism already in place to support this. Once `default.api.timeout.ms` has been reached, the topics which were throttled will return the `QuotaViolatedException` to the caller. Any other errors will be given back to the caller as today. As the new exception is a retryable exception, it should not break code of users when they upgrade as handling retryable exception is already something that they should do.

Manual Retry

To complement this default behavior, we propose to add a new configuration to the admin client to allow users to opt-out from the automatic retry mechanism. It allows application to have fine grained control.

Public Interfaces

Protocol

The following new error code will be introduced:

  • THROTTLING_QUOTA_EXCEEDED: It will be used when the defined throttling quota is exceeded but only for the above RPCs.

While discussing the naming of the error. We have also thought about using LIMIT_QUOTA_EXCEEDED in the future when a limit quota is exceeded. A limit is final error whereas a throttling error can be retried.

In order to be able to distinguish the new clients from the old ones, we will bump to version of the following requests/responses:

  • `CreateTopicsRequest` and `CreateTopicsResponse` to version 6.
  • `CreatePartitionsRequest` and `CreatePartitionsResponse` to version 3.
  • `DeleteTopicRequest` and `DeleteTopicResponse` to version 5.

Starting from the bumped version, the new `THROTTLING_QUOTA_EXCEEDED` error will be used. It won't be used for older versions.

We will add the `ErrorMessage` field in the `DeleteTopicResponse` as follow:

Code Block
languagejs
linenumberstrue
{
  "apiKey": 20,
  "type": "response",
  "name": "DeleteTopicsResponse",
  // Version 1 adds the throttle time.
  //
  // Starting in version 2, on quota violation, brokers send out responses before throttling.
  //
  // Starting in version 3, a TOPIC_DELETION_DISABLED error code may be returned.
  //
  // Version 4 is the first flexible version.
  //
  // Version 5 adds the ErrorMessage field.
  "validVersions": "0-5",
  "flexibleVersions": "4+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]DeletableTopicResult", "versions": "0+",
      "about": "The results for each topic we tried to delete.", "fields": [
      { "name": "Name
Code Block
languagejs
linenumberstrue
{
  "apiKey": 20,
  "type": "response",
  "name": "DeleteTopicsResponse",
  // Version 1 adds the throttle time.
  //
  // Starting in version 2, on quota violation, brokers send out responses before throttling.
  //
  // Starting in version 3, a TOPIC_DELETION_DISABLED error code may be returned.
  //
  // Version 4 is the first flexible version.
  //
  // Version 5 adds the ErrorMessage field.
  "validVersions": "0-5",
  "flexibleVersions": "4+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32string", "versions": "10+", "ignorablemapKey": true, "entityType": "topicName",
        "about": "The durationtopic in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    name" },
      { "name": "ResponsesErrorCode", "type": "[]DeletableTopicResultint16", "versions": "0+",
        "about": "The resultsdeletion forerror, eachor topic0 weif triedthe todeletion deletesucceeded." }, "fields": [

         { "name": "NameErrorMessage", "type": "string", "versions": "05+", "mapKeynullableVersions": true"5+", "entityTypeignorable": "topicName"true,
        "about": "The topic name" },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The deletion error, or 0 if the deletion succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "5+", "nullableVersions": "5+", "ignorable": true,
        "about": "The error message, or null if there was no error." }
    ]}
  ]
}

New Broker Configurations

We propose to introduce the following new configuration in the Kafka broker:

...

The number of samples to retain in memory for alter controller mutations replication quotas

...

The time span of each sample for controller mutations quotas

error message, or null if there was no error." }
    ]}
  ]
}

New Broker Configurations

We propose to introduce the following new configuration in the Kafka broker:

NameTypeDefaultDescription
controller.quota.window.numInt11

The number of samples to retain in memory for alter controller mutations replication quotas

controller.quota.window.size.secondsInt1

The time span of each sample for controller mutations quotas

New Quota Types

We propose the introduce the following new quota types in the Kafka Broker:

NameTypeDefaultDescription
controller_mutations_rate*DoubleLong.MaxValue

The rate at which mutations are accepted for the create topics request, the create partitions request and the delete topics request. The rate is accumulated by the number of partitions created or deleted.

They will be supported for <client-id>, <user> and <user, client-id> similar to the existing quotas. Defaults can be configured using the dynamic default properties at <client-id>, <user> and <user, client-id> levels.

* We keep the name intentionally generic to allow us to extend their coverage in the future.

New Broker Metrics

We propose to expose the following new metric in the Kafka Broker:

GroupNameTagsDescription
ControllerMutationrate(user, client-id) - with the same rules used by existing quota metricsThe current rate.
ControllerMutationtokens(user, client-id) - with the same rules used by existing quota metricsThe remaining tokens in the bucket. < 0 indicates that throttling is applied. 
ControllerMutationthrottle-time(user, client-id) - with the same rules used by existing quota metricsTracking average throttle-time per user/client-id.

New TokenBucket Metric

As mentioned, we propose to introduce a new metric named `TokemBucket` which implements the tokens bucket algorithm.

Code Block
languagejava
linenumberstrue
/**
 * The {@link TokenBucket} is a {@link MeasurableStat} implementing a token bucket algorithm
 * that is usable within a {@link org.apache.kafka.common.metrics.Sensor}.
 */
public class TokenBucket implements MeasurableStat {
    public TokenBucket() {
        this(TimeUnit.SECONDS);
    }

    public TokenBucket(TimeUnit unit) {
        // Implementation omitted
    }

    @Override
    public double measure(final MetricConfig config, final long timeMs) {
        // Implementation omitted
    }

    @Override
    public void record(final MetricConfig config, final double value, final long timeMs) {
        // Implementation omitted
    }
}

New Quota Types

We propose the introduce the following new quota types in the Kafka Broker:

...

The rate at which mutations are accepted for the create topics request, the create partitions request and the delete topics request. The rate is accumulated by the number of partitions created or deleted.

They will be supported for <client-id>, <user> and <user, client-id> similar to the existing quotas. Defaults can be configured using the dynamic default properties at <client-id>, <user> and <user, client-id> levels.

* We keep the name intentionally generic to allow us to extend their coverage in the future.

Broker Metrics

We propose to expose the following new metric in the Kafka Broker:

...

Admin API

As mentioned, we propose to introduce a new retryable `QuotaViolatedException` `ThrottlingQuotaExceededException` exception which will be given back to the caller when a topic is rejected due to throttling. The new exception maps to the new `THROTTLING_QUOTA_EXCEEDED` error.

...

The `CreateTopicsOptions`, `CreatePartitionsOptions`, and `DeleteTopicsOptions` will be extended to include a flag indicating if `QuotaViolatedException` `ThrottlingQuotaExceededException` should be automatically retried by the AdminClient or not.

Code Block
languagejava
linenumberstrue
/**
 * Options for {@link Admin#createTopics(Collection)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class CreateTopicsOptions extends AbstractOptions<CreateTopicsOptions> {

	private boolean retryQuotaViolatedExceptionretryOnQuotaViolation = true;

	    /**
     * Set the retry QuotaViolatedException to indicate wether QuotaViolatedException
 to true if quota violation should be automatically retried.
     */
    public CreateTopicsOptions retryOnQuotaViolation(boolean retryOnQuotaViolation) {
        this.retryOnQuotaViolation = retryOnQuotaViolation;
       * shouldreturn bethis;
 automatically retried or not.
	 */
	public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {}

	}

    /**
     * Returns true if thequota QuotaViolatedExceptionviolation should be automatically retried.
	     * by the AdminClient.
	 */
	public boolean retryQuotaViolatedException(){/
    public boolean shouldRetryOnQuotaViolation() {
        return retryOnQuotaViolation;
    }
}

/**
 * Options for {@link Admin#createPartitions(Map)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class CreatePartitionsOptions extends AbstractOptions<CreatePartitionsOptions> {

	private boolean retryQuotaViolatedExceptionretryOnQuotaViolation = true;

	    /**
     * Set the retry QuotaViolatedException to indicate wether QuotaViolatedException
 to true if quota violation should be automatically retried.
     */
    public CreatePartitionsOptions retryOnQuotaViolation(boolean retryOnQuotaViolation) {
        this.retryOnQuotaViolation = retryOnQuotaViolation;
       * shouldreturn bethis;
 automatically retried or not.
	 */
	public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {}

	}

    /**
     * Returns true if thequota QuotaViolatedExceptionviolation should be automatically retried.
	     * by the AdminClient.
	 */
	public boolean retryQuotaViolatedException(){/
    public boolean shouldRetryOnQuotaViolation() {
        return retryOnQuotaViolation;
    }
}


/**
 * Options for {@link Admin#deleteTopics(Collection)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteTopicsOptions extends AbstractOptions<DeleteTopicsOptions> {

	private boolean retryQuotaViolatedExceptionretryOnQuotaViolation = true;

	    /**
     * Set the retry QuotaViolatedException to indicate wether QuotaViolatedExceptionto true if quota violation should be automatically retried.
     * should be automatically retried or not.
	 */
	public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {}

	/
    public DeleteTopicsOptions retryOnQuotaViolation(boolean retryOnQuotaViolation) {
        this.retryOnQuotaViolation = retryOnQuotaViolation;
        return this;
    }

    /**
     * Returns true if thequota QuotaViolatedExceptionviolation should be automatically retried.
	 * by the AdminClient.
	 */
	    public boolean retryQuotaViolatedExceptionshouldRetryOnQuotaViolation() {
        return retryOnQuotaViolation;
    }
}

Kafka Topic Command

We propose to disable the automatic try of the QuotaViolatedException for the `kafka-topics.sh` command in order to not have the command blocked until the retry period is exhausted. 

...

  • By default, the upgrade should be transparent since the Admin Client will automatically retry QuotaViolationException and return it to the caller only if the retry timeout is reached. In this case, the caller must at minimum handle the RetryableException and retry. Handling retryable Exceptions is something that we can safely expect from clients.

Rejected Alternatives

...

Update our existing Rate to behave like the Token Bucket

...

We have tried to modify our existing Rate to behave like the Token Bucket. That works but would not provided the same observability guarantees anymore. The Rate would behave like it does today and that is counter intuitive from an operations perspective. Therefore, we have preferred using a new metric along side the Rate to enforce the Our initial idea was to use the Token Bucket Algorithm in lieu of the existing Rate based quota. While it is a bit easier to understand due to its explicit handling of the burst, we have finally decided against it in order to remain consistent with the existing quota.

Usage based Quota

We have considered using a usage based quota similarly to the request quota. The usage would mainly be measured as the CPU time taken by the operations. While this would the benefits to cover all the operations in the controller, we have decided against it for the following reasons:

...