You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Draft [One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here 

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The public APIs to create topics, create partitions, and delete topics are heavy operations in the Kafka Controller and thus have a direct impact on its overall load. In the worst case, a misbehaving client could, intentionally or unintentionally, overload the controller which could affect the health of the overall cluster. For instance, electing a new partition leader could be delayed significantly, resulting in a partition which is not available. This issue arise particularly in shared clusters where clients can create and delete partitions at their own good will.

We did some basic performance experiments to understand the effect of rapidly creating or deleting high number of topics with a 9 brokers cluster:

  • Our first experiment consisting in creating 5000 topics with 4 partitions and a replication factor of 3, at a rate of 10 requests/sec, containing 50 topics each, showed that the controller took approx. 2 minutes to process all the events queued up due to the topics creation.
  • Our second experiments consisting in deleting the 5000 topics with a single request showed that the the controller took approx. 2 minutes to process all the events queued up due to the topics deletion.

In both case, it means that any cluster event queued up after all these events will be waiting for approx. 2 minutes in the queue before being processed.

In order to prevent a cluster from being armed due to high concurrent topic and/or partition creations or topics deletions, we propose to introduce a new quota limiting the admission of these operations. All operations exceeding the quota will be rejected with an explicit error. The new quota is a second level quota which complements the existing request quota.

Proposed Changes

Changes to Kafka Brokers

We propose to introduce a second-level quota for the `CreateTopicsRequest`, the `CreatePartitionsRequest` and the `DeleteTopicsRequests` in the Kafka API layer that limits the number of partition mutations allowed per second. The number of partition mutations is defined as the number of partitions of topic to be created, or the number of partitions to be added to an existing topic, or the number of partitions of a topic to be deleted. We chose to express the quota in partition mutations because the load in the controller is highly correlated to the number of created or deleted partitions.

The broker hosting the controller receives all the above request (already the case today) and can accept partition mutations with the rate R partitions per second and the maximum burst worth of B partitions. If a client sends a request with N topics, topics worth of N <= B partitions are accepted until B is exhausted, all the remaining are rejected with a `QUOTA_VIOLATED` error. Once the client dipped into the burst, it has to wait for R * <number of partitions of smallest remaining topic>.

The reason to explicitly defines a burst B is to allow a burst of several mutations with a small or medium number of partitions within the same request. Typically, applications tend to send one request with all the topics when they start. This would not work without an explicit burst.

Throttling Algorithm

We propose to use the Token Bucket algorithm to achieve this behavior:

Let's define:

  • K: The number of tokens in the bucket
  • B: The maximum burst - The maximum number of tokens that can be accumulated in the bucket
  • R: The rate at which we accumulate tokens
  • T: The last time K got updated

At the time now, we update the number of tokens in the bucket with:

  • K = max(K + (now - T) * R), B)

A partition mutation with N partitions is admitted iff K >= N, and updates K = K - N. The mutation request is rejected otherwise.

Partition Mutations Quota Manager

We propose to introduce a new quota manager called `PartitionMutationsQuotaManager` which implements the algorithm described above. The new quota manager will be configured by two dynamic configurations: 'controller.partition.mutations.rate` and `controller.partition.mutations.burst`. While the configuration could be set by broker, we envision to have it set only has a cluster wide default for two reasons:

  1. It is effectively used only by the broker which hosts the controller.
  2. It does not make sense to have different values in the cluster. 

Handling of new Clients

For new clients, the new `QUOTA_VIOLATED` error code will be used to inform client that a topic has been rejected. The `throttle_time_ms` field be used to inform the client about how long it has been throttled:

  • throttle_time_ms = max(<request throttle time>, min(<throttle time of each topics in the request>) - <waiting time in the purgatory if applied>)

The channel will be mutated as well when `throttle_time_ms > 0`. This is necessary to cope with nasty clients which would not honour the throttling time.

Handling of old Clients

For old clients, we can't gracefully reject any topics requests. Therefore, we propose to always accept them and to throttle the client by muting is channel. The throttle time will be computed at the time the client should have waited for. The `throttle_time_ms` field will be populated accordingly.

Changes to Kafka Admin Client

Automatic Retry

A new retryable `QuotaViolatedException` will be introduce on the client side. By default, the admin client will retry this error until `default.api.timeout.ms` is reached and also honour the throttling time. We will extend the retry mechanism already in place to support this. Once `default.api.timeout.ms` has been reached, the topics which were throttled will return the `QuotaViolatedException` to the caller. Any other errors will be given back to the caller as today. As the new exception is a retryable exception, it should not break code of users when they upgrade as handling retryable exception is already something that they should do.

Manual Retry

To complement this default behavior, we propose to add a new configuration to the admin client to allow users to opt-out from the automatic retry mechanism. It allows application to have fine grained control.

Changes to Kafka Streams

TODO

Public Interfaces

Protocol

In order to be able to distinguish the new clients from the old ones, we will bump to version of the following requests/responses:

  • `CreateTopicsRequest` and `CreateTopicsResponse`
  • `CreatePartitionsRequest` and `CreatePartitionsResponse`
  • `DeleteTopicRequest` and `DeleteTopicResponse`

Starting from the bumped version, the new `QUOTA_VIOLATED` error will be used. It won't be used for older versions.

Configuration

  • `controller.partition.mutations.rate`
  • `controller.partition.mutations.burst`
  • `retry.quota.violation.exception` (default true) - whether `QuotaViolationException` must be automatically retried or not


Metrics

  1. Remaining Burst

Public API

As mentioned, we propose to introduce a new retryable `QuotaViolatedException` exception which will be given back to the called when a topic is rejected due to throttling.

/**
 * Exception thrown if an operation on a resource violate the quota.
 */
public class QuotaViolationException extends RetryableException {
	private int throttleTimeMs; 
	
    public QuotaViolationException(int throttleTimeMs, String message) {
        super(message);
		this.throttleTimeMs = throttleTimeMs;
    }

    public QuotaViolationException(int throttleTimeMs, String message, Throwable cause) {
        super(message, cause);
		this.throttleTimeMs = throttleTimeMs;
    }

	public int throttleTimeMs() {
		return this.throttleTimeMs;
    }
}

Compatibility, Deprecation, and Migration Plan

autotopic creations

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels