Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The reason to explicitly defines a burst B is to allow a burst of several mutations with a small or medium number of partitions within the same request. Typically, applications tend to send one request to create all all the topics that they need. This would not work without an explicit burst.

Throttling Algorithm

We propose to leverage our existing Rate based throttling mechanism to behavior explained above but we will update its implementation to better cope with bursty workloads.

Let's take an example to illustrate how this works. Let's say that we want to guarantee an average rate R=5 mutations/sec while allowing a burst B=500 mutations. This translates to the following parameters for our current rate based quota:

  • Quota = 5
  • Samples = B / R = 100
  • Time Window = 1s (the default) 

If a client sends a request to create 7 topics with 80 partitions each at the time T. It brings the average rate to 5.6 (7 * 80 / 100) which is above the quota so any new subsequent requests is rejected until the quota gets back to R. In theory, the client must wait 12 seconds ((5.6 - 5) / 5 * 100) to get back to R.

In practice, due to the sparse samples (one sample worth 560), the rate won't decrease until that sample is dropped and that will takes 101 seconds. To overcome this, we propose the update the implementation of our Rate metric to spread evenly the burst among the previous windows. So instead of having one sample equal to 560 in the last window, we will have 100 samples equal to 5.6. With this, dropping 12 samples brings the rate below the quota.

Altogether, a mutation with N partitions is admitted iff Average Rate < Q. The partition mutation request is rejected otherwise. The client must wait (Average Rate - Quota) / Quota * # Samples seconds.

Controller Mutations Quota Manager

We propose to introduce a new quota manager called `ControllerMutationsQuotaManager` which implements the algorithm described above. The new quota manager will be configured by the new quota types: `controller_mutations_rate` and `controller_mutations_burst`.

Handling of new Clients

For new clients, the new `QUOTA_VIOLATED` error code will be used to inform client that a topic has been rejected. The `throttle_time_ms` field be used to inform the client about how long it has been throttled:

  • throttle_time_ms = max(<request throttle time>, min(<throttle time of rejected topics in the request>) - <waiting time in the purgatory if applied>)

The channel will be muted as well when `throttle_time_ms > 0`. This is necessary to cope with misbehaving clients which would not honour the throttling time. This logic already exist and is used by the request quota thus we will reuse it.

Handling of old Clients

For old clients, we can't gracefully reject any topics requests. Therefore, we propose to always accept them and to throttle the client by muting is channel. The throttle time will be computed at the time the client should have waited for. The `throttle_time_ms` field will be populated accordingly. For `CreateTopicResponse` and `CreatePartitionsResponse`, we will also populate the `ErrorMessage` to explain that they have been throttled if they were.

Co-Existence of Multiple Quotas

The new partition mutations quota can be used in conjunction with the existing request quota. When both are used, the client will be throttled by the most constraining of the two.

Handling of ValidateOnly

The CreateTopicsRequest and CreatePartitionsRequest offer the possibility to only validate their content by setting the flat `ValidateOnly`. As the validation does not have generate load on the controller, we plan to not apply the quota is this particular case. We will clarify this in the documentation.

Changes to Kafka Admin Client

Automatic Retry

A new retryable `QuotaViolatedException` will be introduce on the client side. By default, the admin client will retry this error until `default.api.timeout.ms` is reached and also honour the throttling time. We will extend the retry mechanism already in place to support this. Once `default.api.timeout.ms` has been reached, the topics which were throttled will return the `QuotaViolatedException` to the caller. Any other errors will be given back to the caller as today. As the new exception is a retryable exception, it should not break code of users when they upgrade as handling retryable exception is already something that they should do.

Manual Retry

To complement this default behavior, we propose to add a new configuration to the admin client to allow users to opt-out from the automatic retry mechanism. It allows application to have fine grained control.

Public Interfaces

Protocol

In order to be able to distinguish the new clients from the old ones, we will bump to version of the following requests/responses:

  • `CreateTopicsRequest` and `CreateTopicsResponse` to version 6.
  • `CreatePartitionsRequest` and `CreatePartitionsResponse` to version 3.
  • `DeleteTopicRequest` and `DeleteTopicResponse` to version 5.

Starting from the bumped version, the new `QUOTA_VIOLATED` error will be used. It won't be used for older versions.

We will add the `ErrorMessage` field in the `DeleteTopicResponse` as follow:

Our current Rate based quota implementation accepts any burst and then compute the throttle to bring back the rate to the allowed quota. We can effectively define a burst by configuring the number of samples kept appropriately as the average rate is computed based on the sum of all samples divided by the overall time window. While this work well with workloads which generates multiple samples, we have found that it does not work well when the workload is bursty like ours in conjunction with a large window. The issue is that a unique and large sample can hold the average above the quota and this until it is discarded. Let's take an example to illustrate this behavior.

Let's imagine that we want to guarantee an average rate R = 5 mutations/sec while accepting a burst B = 500 mutations. This can be achieve by using the following parameters for our current rate based quota:

  • Q (Quota) = 5
  • S (Samples) = B / R = 100
  • W (Time Window) = 1s (the default)

With this configuration, if a client sends a request with 7 topics with 80 partitions each at time T, worth a total of 560 mutations, it bring the average rate R to 5.6 (560 / 100). As the rate is above the defined quota, any subsequent mutations is rejected until the average rate gets back to the allowed quota. Currently, this is computed as follow for our current quotas: ((R - Q / Q * S * W)) = ((5.6 - 5) / 5 * 100 * 1) = 12 secs. In practice, the average rate won't go back until the samples at time T is discarded so any new mutations won't be accepted before S * W. 100 secs in our case.

To overcome this, we initially thought about using the Tokens Bucket Algorithm. The algorithm guarantee an average rate by accumulating tokens at the defined rate and allowing to spend the accumulated tokens to do an operations. The burst is defined by the maximum number of tokens which can be accumulated. As the algorithm does not rely on any sampling, the nature of the workload does not matter at all. In comparison, our current quota mechanism can be seen as the opposite as it borrows tokens from the future and then throttle to wait until they are paid back. Using the token bucket algorithm requires to introduce another mechanism which is not nothing that we are willing to do in order to remain consistent in Kafka. Instead, we propose to slightly modify our current rate based mechanism to get closer from the idea of the tokens bucket algorithm.

We propose the update the implementation of our Rate metrics (when used in conjunction with a quota) to allocate the burst to all the previous window up to Q and to allocate the remaining part the the current window. The rational behind this is to allow the average rate to decrease regularly similarly to what the token bucket does. While not perfect, it allows to better accommodate bursty workloads. Let's look at our previous example with this change in mind. Instead of having one sample worth of 560, we will have 99 samples worth of 5 and one last sample worth of 65. The average rate is still 5.6 as the sum does not change. By waiting 12 secs (or discarding 12 samples) brings back the average rate to the define quota Q.

Altogether, a mutation with N partitions is admitted iff R <= Q. The partition mutation request is rejected otherwise. The throttle time is defined with: (R - Q) / Q * S * W seconds.

Controller Mutations Quota Manager

We propose to introduce a new quota manager called `ControllerMutationsQuotaManager` which implements the algorithm described above. The new quota manager will be configured by the new quota types: `controller_mutations_rate` and `controller_mutations_burst`.

Handling of new Clients

For new clients, the new `QUOTA_VIOLATED` error code will be used to inform client that a topic has been rejected. The `throttle_time_ms` field be used to inform the client about how long it has been throttled:

  • throttle_time_ms = max(<request throttle time>, min(<throttle time of rejected topics in the request>) - <waiting time in the purgatory if applied>)

The channel will be muted as well when `throttle_time_ms > 0`. This is necessary to cope with misbehaving clients which would not honour the throttling time. This logic already exist and is used by the request quota thus we will reuse it.

Handling of old Clients

For old clients, we can't gracefully reject any topics requests. Therefore, we propose to always accept them and to throttle the client by muting is channel. The throttle time will be computed at the time the client should have waited for. The `throttle_time_ms` field will be populated accordingly. For `CreateTopicResponse` and `CreatePartitionsResponse`, we will also populate the `ErrorMessage` to explain that they have been throttled if they were.

Co-Existence of Multiple Quotas

The new partition mutations quota can be used in conjunction with the existing request quota. When both are used, the client will be throttled by the most constraining of the two.

Handling of ValidateOnly

The CreateTopicsRequest and CreatePartitionsRequest offer the possibility to only validate their content by setting the flat `ValidateOnly`. As the validation does not have generate load on the controller, we plan to not apply the quota is this particular case. We will clarify this in the documentation.

Changes to Kafka Admin Client

Automatic Retry

A new retryable `QuotaViolatedException` will be introduce on the client side. By default, the admin client will retry this error until `default.api.timeout.ms` is reached and also honour the throttling time. We will extend the retry mechanism already in place to support this. Once `default.api.timeout.ms` has been reached, the topics which were throttled will return the `QuotaViolatedException` to the caller. Any other errors will be given back to the caller as today. As the new exception is a retryable exception, it should not break code of users when they upgrade as handling retryable exception is already something that they should do.

Manual Retry

To complement this default behavior, we propose to add a new configuration to the admin client to allow users to opt-out from the automatic retry mechanism. It allows application to have fine grained control.

Public Interfaces

Protocol

In order to be able to distinguish the new clients from the old ones, we will bump to version of the following requests/responses:

  • `CreateTopicsRequest` and `CreateTopicsResponse` to version 6.
  • `CreatePartitionsRequest` and `CreatePartitionsResponse` to version 3.
  • `DeleteTopicRequest` and `DeleteTopicResponse` to version 5.

Starting from the bumped version, the new `QUOTA_VIOLATED` error will be used. It won't be used for older versions.

We will add the `ErrorMessage` field in the `DeleteTopicResponse` as follow:

Code Block
languagejs
linenumberstrue
{
  "apiKey": 20,
  "type": "response",
  "name": "DeleteTopicsResponse",
  // Version 1 adds the throttle time.
  //
  // Starting in version 2, on quota violation, brokers send out responses before throttling.
  //
  // Starting in version 3, a TOPIC_DELETION_DISABLED error code may be returned.
  //
  // Version 4 is the first flexible version.
  //
  // Version 5 adds the ErrorMessage field.
  "validVersions": "0-5",
  "flexibleVersions": "4+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1
Code Block
languagejs
linenumberstrue
{
  "apiKey": 20,
  "type": "response",
  "name": "DeleteTopicsResponse",
  // Version 1 adds the throttle time.
  //
  // Starting in version 2, on quota violation, brokers send out responses before throttling.
  //
  // Starting in version 3, a TOPIC_DELETION_DISABLED error code may be returned.
  //
  // Version 4 is the first flexible version.
  //
  // Version 5 adds the ErrorMessage field.
  "validVersions": "0-5",
  "flexibleVersions": "4+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]DeletableTopicResult", "versions": "0+",
      "about": "The results for each topic we tried to delete.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
        "about": "The topic name" },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The deletion error, or 0 if the deletion succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "5+", "nullableVersions": "5+", "ignorable": true,
        "about": "The errorduration message,in ormilliseconds nullfor ifwhich therethe request was no error throttled due to a quota violation, or zero if the request did not violate any quota." },
    ]}
  ]
}

New Broker Configurations

We propose to introduce the following new configuration in the Kafka broker:

...

The number of samples to retain in memory for alter controller mutations replication quotas

...

The time span of each sample for controller mutations quotas

New Quota Types

We propose the introduce the following new quota types in the Kafka Broker:

...

The rate at which mutations are accepted for the create topics request, the create partitions request and the delete topics request.

They will be supported for <client-id>, <user> and <user, client-id> similar to the existing quotas. Defaults can be configured using the dynamic default properties at <client-id>, <user> and <user, client-id> levels.

* We keep the name intentionally generic to allow us to extend their coverage in the future. 

Broker Metrics

We propose to expose the following new metric in the Kafka Broker:

...

Admin API

As mentioned, we propose to introduce a new retryable `QuotaViolatedException` exception which will be given back to the caller when a topic is rejected due to throttling. The new exception maps to the new `QUOTA_VIOLATED_ERROR` error.

Code Block
languagejava
linenumberstrue
/**
 * Exception thrown if an operation on a resource violate the quota.
 */
public class QuotaViolationException extends RetryableException {
	private int throttleTimeMs; 
	
    public QuotaViolationException(int throttleTimeMs, String message) {{ "name": "Responses", "type": "[]DeletableTopicResult", "versions": "0+",
      "about": "The results for each topic we tried to delete.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
        "about": "The topic name" },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The deletion error, or 0 if the deletion succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "5+", "nullableVersions": "5+", "ignorable": true,
        super(message);
		this.throttleTimeMs = throttleTimeMs;
    }

    public QuotaViolationException(int throttleTimeMs, String message, Throwable cause) {
        super(message, cause);
		this.throttleTimeMs = throttleTimeMs;
    }

	public int throttleTimeMs() {
		return this.throttleTimeMs;
    }
}

The `CreateTopicsOptions`, `CreatePartitionsOptions`, and `DeleteTopicsOptions` will be extended to include a flag indicating if `QuotaViolatedException` should be automatically retried by the AdminClient or not.

Code Block
languagejava
linenumberstrue
/**
 * Options for {@link Admin#createTopics(Collection)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class CreateTopicsOptions extends AbstractOptions<CreateTopicsOptions> {

	private boolean retryQuotaViolatedException = true;

	/**
     * Set the retry QuotaViolatedException to indicate wether QuotaViolatedException
     * should be automatically retried or not.
	 */
	public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {}

	/**
     * Returns true if the QuotaViolatedException should be automatically retried
	 * by the AdminClient.
	 */
	public boolean retryQuotaViolatedException(){}
}

/**
 * Options for {@link Admin#createPartitions(Map)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class CreatePartitionsOptions extends AbstractOptions<CreatePartitionsOptions> {

	private boolean retryQuotaViolatedException = true;

	/**
     * Set the retry QuotaViolatedException to indicate wether QuotaViolatedException
     * should be automatically retried or not.
	 */
	public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {}

	/**
     * Returns true if the QuotaViolatedException should be automatically retried
	 * by the AdminClient.
	 */
	public boolean retryQuotaViolatedException(){}
}


/**
 * Options for {@link Admin#deleteTopics(Collection)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteTopicsOptions extends AbstractOptions<DeleteTopicsOptions> {

	private boolean retryQuotaViolatedException = true;

	/**
     * Set the retry QuotaViolatedException to indicate wether QuotaViolatedException
     * should be automatically retried or not.
	 */
	public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {}

	/**
     * Returns true if the QuotaViolatedException should be automatically retried
	 * by the AdminClient.
	 */
	public boolean retryQuotaViolatedException(){}
}

Kafka Topic Command

We propose to disable the automatic try of the QuotaViolatedException for the `kafka-topics.sh` command in order to not have the command blocked until the retry period is exhausted. 

Known Limitations

Auto Topic Creation

As of writing this proposal, the auto topic creation feature uses Zookeeper to create topics. Therefore, using the feature is not compatible with this proposal at the moment. KIP-590 (not adopted yet) will change this to use a regular CreateTopicsRequest instead of writing to Zookeeper directly. The request will be sent as a "cluster admin" therefore auto topic creations will be accounted for that user. We could improve the accounting in the future KIP if the need arise.

Alter Topics via Zookeeper

"about": "The error message, or null if there was no error." }
    ]}
  ]
}

New Broker Configurations

We propose to introduce the following new configuration in the Kafka broker:

NameTypeDefaultDescription
controller.quota.window.numInt11

The number of samples to retain in memory for alter controller mutations replication quotas

controller.quota.window.size.secondsInt1

The time span of each sample for controller mutations quotas

New Quota Types

We propose the introduce the following new quota types in the Kafka Broker:

NameTypeDefaultDescription
controller_mutations_rate*LongLong.MaxValue

The rate at which mutations are accepted for the create topics request, the create partitions request and the delete topics request.

They will be supported for <client-id>, <user> and <user, client-id> similar to the existing quotas. Defaults can be configured using the dynamic default properties at <client-id>, <user> and <user, client-id> levels.

* We keep the name intentionally generic to allow us to extend their coverage in the future. 

Broker Metrics

We propose to expose the following new metric in the Kafka Broker:

GroupNameTagsDescription
ControllerMutationsQuotaManagerrate(user, client-id) - with the same rules used by existing quota metricsThe current rate.

Admin API

As mentioned, we propose to introduce a new retryable `QuotaViolatedException` exception which will be given back to the caller when a topic is rejected due to throttling. The new exception maps to the new `QUOTA_VIOLATED_ERROR` error.

Code Block
languagejava
linenumberstrue
/**
 * Exception thrown if an operation on a resource violate the quota.
 */
public class QuotaViolationException extends RetryableException {
	private int throttleTimeMs; 
	
    public QuotaViolationException(int throttleTimeMs, String message) {
        super(message);
		this.throttleTimeMs = throttleTimeMs;
    }

    public QuotaViolationException(int throttleTimeMs, String message, Throwable cause) {
        super(message, cause);
		this.throttleTimeMs = throttleTimeMs;
    }

	public int throttleTimeMs() {
		return this.throttleTimeMs;
    }
}


The `CreateTopicsOptions`, `CreatePartitionsOptions`, and `DeleteTopicsOptions` will be extended to include a flag indicating if `QuotaViolatedException` should be automatically retried by the AdminClient or not.

Code Block
languagejava
linenumberstrue
/**
 * Options for {@link Admin#createTopics(Collection)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class CreateTopicsOptions extends AbstractOptions<CreateTopicsOptions> {

	private boolean retryQuotaViolatedException = true;

	/**
     * Set the retry QuotaViolatedException to indicate wether QuotaViolatedException
     * should be automatically retried or not.
	 */
	public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {}

	/**
     * Returns true if the QuotaViolatedException should be automatically retried
	 * by the AdminClient.
	 */
	public boolean retryQuotaViolatedException(){}
}

/**
 * Options for {@link Admin#createPartitions(Map)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class CreatePartitionsOptions extends AbstractOptions<CreatePartitionsOptions> {

	private boolean retryQuotaViolatedException = true;

	/**
     * Set the retry QuotaViolatedException to indicate wether QuotaViolatedException
     * should be automatically retried or not.
	 */
	public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {}

	/**
     * Returns true if the QuotaViolatedException should be automatically retried
	 * by the AdminClient.
	 */
	public boolean retryQuotaViolatedException(){}
}


/**
 * Options for {@link Admin#deleteTopics(Collection)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteTopicsOptions extends AbstractOptions<DeleteTopicsOptions> {

	private boolean retryQuotaViolatedException = true;

	/**
     * Set the retry QuotaViolatedException to indicate wether QuotaViolatedException
     * should be automatically retried or not.
	 */
	public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {}

	/**
     * Returns true if the QuotaViolatedException should be automatically retried
	 * by the AdminClient.
	 */
	public boolean retryQuotaViolatedException(){}
}

Kafka Topic Command

We propose to disable the automatic try of the QuotaViolatedException for the `kafka-topics.sh` command in order to not have the command blocked until the retry period is exhausted. 

Known Limitations

Auto Topic Creation

As of writing this proposal, the auto topic creation feature uses Zookeeper to create topics. Therefore, using the feature is not compatible with this proposal at the moment. KIP-590 (not adopted yet) will change this to use a regular CreateTopicsRequest instead of writing to Zookeeper directly. The request will be sent as a "cluster admin" therefore auto topic creations will be accounted for that user. We could improve the accounting in the future KIP if the need arise.

Alter Topics via Zookeeper

The proposal do not support the now deprecated way to create, expand or delete topics via Zookeeper because it is not possible to control nor reject changes written to Zookeeper directly.

How to Define the Quota?

While we can provide an exact formula to compute the ideal quota for a cluster, here are some general idea to find it.

  • The first step consists in finding the actual real limits for your cluster. To achieve this, the best is to run workloads to create topics, create partitions and delete topics in isolation with different rate and partition size and to measure the impact on the controller. One can use the number of events queued in the controller to see how busy it is. Ideally, the number of events must remain small.
  • Choose the quota based on the worst case and based on the extra load the cluster can sustain.
  • Once the overall quota is defined, it can be divided among the tenant in the cluster.

Usually, all the tenants won't do operations at the same time so we believe that it is sage to over allocate the quota for each tenant.

It is also important to note the capacity of the controller does not scale with the number of node in the cluster. In fact, it is more the opposite as the controller will have more work to do with more nodes so quota may need to be slightly decrease when the cluster growThe proposal do not support the now deprecated way to create, expand or delete topics via Zookeeper because it is not possible to control nor reject changes written to Zookeeper directly.

Compatibility, Deprecation, and Migration Plan

...