Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

If the number of tokens decreases below 0 (K <= 0), a new operation with N partition mutations has to wait for R * ( -K + 1 ).

...

Controller Mutations Quota Manager

We propose to introduce a new quota manager called `PartitionMutationsQuotaManager` ControllerMutationsQuotaManager` which implements the algorithm described above. The new quota manager will be configured by two dynamic configurations: 'controller.partition.mutations.the new quota types: `controller_mutations_rate` and `controller.partition._mutations._burst`. While the configuration could be set by broker, we envision to have it set only has a cluster wide default for two reasons:

  1. It is effectively used only by the broker which hosts the controller.
  2. It does not make sense to have different values in the cluster. 

Handling of new Clients

Handling of new Clients

For new clients, the new `QUOTA_VIOLATED` error code will be used to inform client that a For new clients, the new `QUOTA_VIOLATED` error code will be used to inform client that a topic has been rejected. The `throttle_time_ms` field be used to inform the client about how long it has been throttled:

...

NameTypeDefaultDescription
controller_partition_mutations_burst*LongLong.MaxValueThe maximum burst of partition mutations allowed at any given second.
controller_partition_mutations_rate*LongLong.MaxValue

The rate at which partition mutations are accepted for the create topics request, the create partitions request and the delete topics request.

They will be supported for <client-id>, <user> and <user, client-id> similar to the existing quotas. Defaults can be configured using the dynamic default properties at <client-id>, <user> and <user, client-id> levels.

...

* We keep the name intentionally generic to allow us to extend their coverage in the future. 

Broker Metrics

We propose

...

to expose the following new

...

metric in the

...

Kafka Broker:

Type
GroupNameTagsDefaultDescription
retry.quota.violation.exceptionBooleantrueWhether the `QuotaViolationException` must be automatically retried or not.

Broker Metrics

We propose to expose the following new metric in the Kafka Broker:

ControllerMutationsQuotaManageravailable-burst(
GroupNameTagsDescription
PartitionMutationsQuotaManagerremaining-tokens(user, client-id) - with the same rules used by existing quota metricsThe number of remaining tokens burst available in the bucket. <= 0 indicates that partition mutations are throttled. 

...

%. 100% means that the burst is fully available. 0% means that the burst is exhausted and thus mutations are not allowed.
ControllerMutationsQuotaManagerrate(user, client-id) - with the same rules used by existing quota metricsThe current rate.

Admin API

As mentioned, we propose to introduce a new retryable `QuotaViolatedException` exception which will be given back to the called caller when a topic is rejected due to throttling. The new exception maps to the new `QUOTA_VIOLATED_ERROR` error.

Code Block
languagejava
linenumberstrue
/**
 * Exception thrown if an operation on a resource violate the quota.
 */
public class QuotaViolationException extends RetryableException {
	private int throttleTimeMs; 
	
    public QuotaViolationException(int throttleTimeMs, String message) {
        super(message);
		this.throttleTimeMs = throttleTimeMs;
    }

    public QuotaViolationException(int throttleTimeMs, String message, Throwable cause) {
        super(message, cause);
		this.throttleTimeMs = throttleTimeMs;
    }

	public int throttleTimeMs() {
		return this.throttleTimeMs;
    }
}

Kafka Topic Command

We propose to disable the automatic try of the QuotaViolatedException for the `kafka-topics.sh` command in order to not have the command blocked until the retry period is exhausted. 

Known Limitations

Auto Topic Creation

As of writing this proposal, the auto topic creation feature uses Zookeeper to create topics. Therefore, using the feature is not compatible with this proposal at the moment. KIP-590 (not adopted yet) will change this to use a regular CreateTopicsRequest instead of writing to Zookeeper directly. The request will be sent as a "cluster admin" therefore auto topic creations will be accounted for that user. We could improve the accounting in the future KIP if the need arise.

Alter Topics via Zookeeper

The proposal do not support the now deprecated way to create, expand or delete topics via Zookeeper because it is not possible to control nor reject changes written to Zookeeper directly.

Compatibility, Deprecation, and Migration Plan

Compatibility with Old Clients

  • By default, there is no impact on existing users since the throttling must be enabled.
  • If the throttling is enabled in a cluster used by old clients, the old clients will be transparently throttled. If the client is throttled, its request will timeout and thus will be retry by the existing mechanism. If the retries fail, the application will receive a TimeoutException which is a retryable exception. Handling TimeoutException and more generally retryable Exceptions is something that clients should already do thus impact is minimal. 

Compatibility after Upgrading the Admin Client to the new version

  • By default, the upgrade should be transparent since the Admin Client will automatically retry QuotaViolationException and return it to the caller only if the retry timeout is reached. In this case, the caller must at minimum handle the RetryableException and retry. Handling retryable Exceptions is something that we can safely expect from clients.

...


The `CreateTopicsOptions`, `CreatePartitionsOptions`, and `DeleteTopicsOptions` will be extended to include a flag indicating if `QuotaViolatedException` should be automatically retried by the AdminClient or not.

Code Block
languagejava
linenumberstrue
/**
 * Options for {@link Admin#createTopics(Collection)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class CreateTopicsOptions extends AbstractOptions<CreateTopicsOptions> {

	private boolean retryQuotaViolatedException = true;

	/**
     * Set the retry QuotaViolatedException to indicate wether QuotaViolatedException
     * should be automatically retried or not.
	 */
	public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {}

	/**
     * Returns true if the QuotaViolatedException should be automatically retried
	 * by the AdminClient.
	 */
	public boolean retryQuotaViolatedException(){}
}

/**
 * Options for {@link Admin#createPartitions(Map)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class CreatePartitionsOptions extends AbstractOptions<CreatePartitionsOptions> {

	private boolean retryQuotaViolatedException = true;

	/**
     * Set the retry QuotaViolatedException to indicate wether QuotaViolatedException
     * should be automatically retried or not.
	 */
	public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {}

	/**
     * Returns true if the QuotaViolatedException should be automatically retried
	 * by the AdminClient.
	 */
	public boolean retryQuotaViolatedException(){}
}


/**
 * Options for {@link Admin#deleteTopics(Collection)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteTopicsOptions extends AbstractOptions<DeleteTopicsOptions> {

	private boolean retryQuotaViolatedException = true;

	/**
     * Set the retry QuotaViolatedException to indicate wether QuotaViolatedException
     * should be automatically retried or not.
	 */
	public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {}

	/**
     * Returns true if the QuotaViolatedException should be automatically retried
	 * by the AdminClient.
	 */
	public boolean retryQuotaViolatedException(){}
}

Kafka Topic Command

We propose to disable the automatic try of the QuotaViolatedException for the `kafka-topics.sh` command in order to not have the command blocked until the retry period is exhausted. 

Known Limitations

Auto Topic Creation

As of writing this proposal, the auto topic creation feature uses Zookeeper to create topics. Therefore, using the feature is not compatible with this proposal at the moment. KIP-590 (not adopted yet) will change this to use a regular CreateTopicsRequest instead of writing to Zookeeper directly. The request will be sent as a "cluster admin" therefore auto topic creations will be accounted for that user. We could improve the accounting in the future KIP if the need arise.

Alter Topics via Zookeeper

The proposal do not support the now deprecated way to create, expand or delete topics via Zookeeper because it is not possible to control nor reject changes written to Zookeeper directly.

Compatibility, Deprecation, and Migration Plan

Compatibility with Old Clients

  • By default, there is no impact on existing users since the throttling must be enabled.
  • If the throttling is enabled in a cluster used by old clients, the old clients will be transparently throttled. If the client is throttled, its request will timeout and thus will be retry by the existing mechanism. If the retries fail, the application will receive a TimeoutException which is a retryable exception. Handling TimeoutException and more generally retryable Exceptions is something that clients should already do thus impact is minimal. 

Compatibility after Upgrading the Admin Client to the new version

  • By default, the upgrade should be transparent since the Admin Client will automatically retry QuotaViolationException and return it to the caller only if the retry timeout is reached. In this case, the caller must at minimum handle the RetryableException and retry. Handling retryable Exceptions is something that we can safely expect from clients.

Rejected Alternatives

Usage based Quota

We have considered using a usage based quota similarly to the request quota. The usage would mainly be measured as the CPU time taken by the operations. While this would the benefits to cover all the operations in the controller, we have decided against it for the following reasons:

  • In oder to make it work, we must have a way to associate the cost of an operation in the controller to a principal and/or a client id. Conceptually, this sounds a bit weird because topics are not associated to a unique principal or client id during their lifetime. A topic can be created by Principal A, then partitions can be added by Client 1, and finally it can be deleted by Principal B. This makes the accounting strange. That would work better if we would have a notion of tenant in Kafka.
  • Measuring the CPU does not take into account the cost associated to all the requests sent out to achieve an operation. 

Throttle the Execution instead of the Admission

...