Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

* We keep the name intentionally generic to allow us to extend their coverage in the future.

The new name works similarly to the already existing quota. It can be set or changed by using the `kafka-configs.sh` tool.

For instance, the above command defines a quota of 10 controller mutations per secs for the (user=user1, client-id=clientA):

Code Block
languagebash
> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'controller_mutations_rate=10' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Updated config for entity: user-principal 'user1', client-id 'clientA'.

The above commands list the quota of (user=user1, client-id=clientA):

Code Block
languagebash
> bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Configs for user-principal 'user1', client-id 'clientA' are controller_mutations_rate=10,... (other quotas are listed as well)

Broker Metrics

We propose to expose the following new metric in the Kafka Broker:

...

Broker Metrics

We propose to expose the following new metric in the Kafka Broker:

GroupNameTagsDescription
ControllerMutationsQuotaManagerrate(user, client-id) - with the same rules used by existing quota metricsThe current rate.

Admin API

As mentioned, we propose to introduce a new retryable `QuotaViolatedException` exception which will be given back to the caller when a topic is rejected due to throttling. The new exception maps to the new `THROTTLING_QUOTA_EXCEEDED` error.

Code Block
languagejava
linenumberstrue
/**
 * Exception thrown if an operation on a resource exceeds the throttling quota.
 */
public class ThrottlineQuotaExceededException extends RetryableException {
	private int throttleTimeMs; 
	
    public QuotaViolationException(int throttleTimeMs, String message) {
        super(message);
		this.throttleTimeMs = throttleTimeMs;
    }

    public QuotaViolationException(int throttleTimeMs, String message, Throwable cause) {
        super(message, cause);
		this.throttleTimeMs = throttleTimeMs;
    }

	public int throttleTimeMs() {
		return this.throttleTimeMs;
    }
}


The `CreateTopicsOptions`, `CreatePartitionsOptions`, and `DeleteTopicsOptions` will be extended to include a flag indicating if `QuotaViolatedException` should be automatically retried by the AdminClient or not

Admin API

As mentioned, we propose to introduce a new retryable `QuotaViolatedException` exception which will be given back to the caller when a topic is rejected due to throttling. The new exception maps to the new `THROTTLING_QUOTA_EXCEEDED` error.

Code Block
languagejava
linenumberstrue
/**
 * ExceptionOptions thrownfor if an operation on a resource exceeds the throttling quota{@link Admin#createTopics(Collection)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ThrottlineQuotaExceededExceptionCreateTopicsOptions extends RetryableExceptionAbstractOptions<CreateTopicsOptions> {

	private int throttleTimeMs; 
	boolean retryQuotaViolatedException = true;

	/**
    public QuotaViolationException(int throttleTimeMs, String message) {* Set the retry QuotaViolatedException to indicate wether QuotaViolatedException
     * should  super(message);
		this.throttleTimeMs = throttleTimeMs;
    }

be automatically retried or not.
	 */
	public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {}

	/**
    public QuotaViolationException(int throttleTimeMs, String message, Throwable cause) {
        super(message, cause);
		this.throttleTimeMs = throttleTimeMs;
    }

	public int throttleTimeMs() {
		return this.throttleTimeMs;
    }
}

The `CreateTopicsOptions`, `CreatePartitionsOptions`, and `DeleteTopicsOptions` will be extended to include a flag indicating if `QuotaViolatedException` should be automatically retried by the AdminClient or not.

Code Block
languagejava
linenumberstrue
/**
 * Options for {@link Admin#createTopics(Collection)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class CreateTopicsOptions extends AbstractOptions<CreateTopicsOptions> * Returns true if the QuotaViolatedException should be automatically retried
	 * by the AdminClient.
	 */
	public boolean retryQuotaViolatedException(){}
}

/**
 * Options for {@link Admin#createPartitions(Map)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class CreatePartitionsOptions extends AbstractOptions<CreatePartitionsOptions> {

	private boolean retryQuotaViolatedException = true;

	/**
     * Set the retry QuotaViolatedException to indicate wether QuotaViolatedException
     * should be automatically retried or not.
	 */
	public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {}

	/**
     * Returns true if the QuotaViolatedException should be automatically retried
	 * by the AdminClient.
	 */
	public boolean retryQuotaViolatedException(){}
}


/**
 * Options for {@link Admin#createPartitionsAdmin#deleteTopics(MapCollection)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class CreatePartitionsOptionsDeleteTopicsOptions extends AbstractOptions<CreatePartitionsOptions>AbstractOptions<DeleteTopicsOptions> {

	private boolean retryQuotaViolatedException = true;

	/**
     * Set the retry QuotaViolatedException to indicate wether QuotaViolatedException
     * should be automatically retried or not.
	 */
	public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {}

	/**
     * Returns true if the QuotaViolatedException should be automatically retried
	 * by the AdminClient.
	 */
	public boolean retryQuotaViolatedException(){}
}


/**
 * Options for {@link Admin#deleteTopics(Collection)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteTopicsOptions extends AbstractOptions<DeleteTopicsOptions> {

	private boolean retryQuotaViolatedException = true;

	/**
     * Set the retry QuotaViolatedException to indicate wether QuotaViolatedException
     * should be automatically retried or not.
	 */
	public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {}

	/**
     * Returns true if the QuotaViolatedException should be automatically retried
	 * by the AdminClient.
	 */
	public boolean retryQuotaViolatedException(){}
}

Kafka Topic Command

...

Kafka Topic Command

We propose to disable the automatic try of the QuotaViolatedException for the `kafka-topics.sh` command in order to not have the command blocked until the retry period is exhausted. 

Kafka Config Command

The new name works similarly to the already existing quota. It can be set or changed by using the `kafka-configs.sh` tool. When the new quota API is used, an old broker that does not support the new name will reject it with a INVALID_REQUEST error.

For instance, the above command defines a quota of 10 controller mutations per secs for the (user=user1, client-id=clientA):

Code Block
languagebash
> bin/kafka-configs.sh --bootstrap-server ... --alter --add-config 'controller_mutations_rate=10' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Updated config for entity: user-principal 'user1', client-id 'clientA'.

The above commands list the quota of (user=user1, client-id=clientA):

Code Block
languagebash
> bin/kafka-configs.sh --bootstrap-server ... --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Configs for user-principal 'user1', client-id 'clientA' are controller_mutations_rate=10,... (other quotas are listed as well)

Known Limitations

Auto Topic Creation

...