Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Altogether, a mutation with N partitions is admitted iff R <= Q. The partition mutation request is rejected otherwise. The throttle time is defined with: (R - Q) / Q * S * W seconds.

Controller

...

Mutation Quota Manager

We propose to introduce a new quota manager called `ControllerMutationsQuotaManager` ControllerMutationQuotaManager` which implements the algorithm described above. The new quota manager will be configured by the new quota types: `controller_mutationsmutation_rate` and `controller_mutations_burst`.

Handling of new Clients

For new clients, the new `THROTTLING_QUOTA_EXCEEDED` error code will be used to inform client that a topic has been rejected. The `throttle_time_ms` field is used to inform the client about how long it has been throttled:

...

NameTypeDefaultDescription
controller_mutations_rate*LongDoubleLong.MaxValue

The rate at which mutations are accepted for the create topics request, the create partitions request and the delete topics request. The rate is accumulated by the number of partitions created or deleted.

...

We propose to expose the following new metric in the Kafka Broker:

GroupNameTagsDescription
ControllerMutationsQuotaManagerControllerMutationQuotaManagerrate(user, client-id) - with the same rules used by existing quota metricsThe current rate.

...

As mentioned, we propose to introduce a new retryable `QuotaViolatedException` `ThrottlingQuotaExceededException` exception which will be given back to the caller when a topic is rejected due to throttling. The new exception maps to the new `THROTTLING_QUOTA_EXCEEDED` error.

...

The `CreateTopicsOptions`, `CreatePartitionsOptions`, and `DeleteTopicsOptions` will be extended to include a flag indicating if `QuotaViolatedException` `ThrottlingQuotaExceededException` should be automatically retried by the AdminClient or not.

Code Block
languagejava
linenumberstrue
/**
 * Options for {@link Admin#createTopics(Collection)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class CreateTopicsOptions extends AbstractOptions<CreateTopicsOptions> {

	private boolean retryQuotaViolatedExceptionretryOnQuotaViolation = true;

	    /**
     * Set the retry QuotaViolatedException to indicatetrue wetherif QuotaViolatedException
     * quota violation should be automatically retried.
 or not.
	    */
	    public CreateTopicsOptions retryQuotaViolatedExceptionretryOnQuotaViolation(boolean retryretryOnQuotaViolation) {}

	/**
     * Returns true if the QuotaViolatedException this.retryOnQuotaViolation = retryOnQuotaViolation;
        return this;
    }

    /**
     * Returns true if quota violation should be automatically retried.
	 * by the AdminClient.
	 */
	    public boolean retryQuotaViolatedExceptionshouldRetryOnQuotaViolation() {
        return retryOnQuotaViolation;
    }
}

/**
 * Options for {@link Admin#createPartitions(Map)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class CreatePartitionsOptions extends AbstractOptions<CreatePartitionsOptions> {

	private boolean retryQuotaViolatedExceptionretryOnQuotaViolation = true;

	    /**
     * Set the retry QuotaViolatedException to indicate wether QuotaViolatedException
     * should be automatically retried or not.
	 */
	public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {}

	 to true if quota violation should be automatically retried.
     */
    public CreatePartitionsOptions retryOnQuotaViolation(boolean retryOnQuotaViolation) {
        this.retryOnQuotaViolation = retryOnQuotaViolation;
        return this;
    }

    /**
     * Returns true if thequota QuotaViolatedExceptionviolation should be automatically retried.
	     * by the AdminClient.
	 */
	public boolean retryQuotaViolatedException(){/
    public boolean shouldRetryOnQuotaViolation() {
        return retryOnQuotaViolation;
    }
}


/**
 * Options for {@link Admin#deleteTopics(Collection)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteTopicsOptions extends AbstractOptions<DeleteTopicsOptions> {

	private boolean retryQuotaViolatedExceptionretryOnQuotaViolation = true;

	    /**
     * Set the retry QuotaViolatedException to indicate wether QuotaViolatedException
     * should be automatically retried or not.
	 */
	public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {}

	 to true if quota violation should be automatically retried.
     */
    public DeleteTopicsOptions retryOnQuotaViolation(boolean retryOnQuotaViolation) {
        this.retryOnQuotaViolation = retryOnQuotaViolation;
        return this;
    }

    /**
     * Returns true if thequota QuotaViolatedExceptionviolation should be automatically retried.
	 * by the AdminClient.
	 */
	    public boolean retryQuotaViolatedExceptionshouldRetryOnQuotaViolation() {
        return retryOnQuotaViolation;
    }
}

Kafka Topic Command

We propose to disable the automatic try of the QuotaViolatedException for the `kafka-topics.sh` command in order to not have the command blocked until the retry period is exhausted. 

...