You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current state: Draft [One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here 

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The public APIs to create topics, create partitions, and delete topics are heavy operations in the Kafka Controller and thus have a direct impact on its overall load. In the worst case, a misbehaving client could, intentionally or unintentionally, overload the controller which could affect the health of the overall cluster. For instance, electing a new partition leader could be delayed significantly, resulting in a partition which is not available. This issue arise particularly in shared clusters where clients can create and delete partitions at their own good will.

We did some basic performance experiments to understand the effect of rapidly creating or deleting high number of topics with a 9 brokers cluster:

  • Our first experiment consisting in creating 5000 topics with 4 partitions and a replication factor of 3, at a rate of 10 requests/sec, containing 50 topics each, showed that the controller took approx. 2 minutes to process all the events queued up due to the topics creation.
  • Our second experiments consisting in deleting the 5000 topics with a single request showed that the the controller took approx. 2 minutes to process all the events queued up due to the topics deletion.

In both case, it means that any cluster event queued up after all these events will be waiting for approx. 2 minutes in the queue before being processed.

In order to prevent a cluster from being armed due to high concurrent topic and/or partition creations or topics deletions, we propose to introduce a new quota limiting the admission of these operations. All operations exceeding the quota will be rejected with an explicit error. The new quota is a second level quota which complements the existing request quota.

Proposed Changes

Changes to Kafka Brokers

We propose to introduce a second-level quota for the `CreateTopicsRequest`, the `CreatePartitionsRequest` and the `DeleteTopicsRequests` in the Kafka API layer that limits the number of partition mutations allowed per second. The number of partition mutations is defined as the number of partitions of a topic to be created, or the number of partitions to be added to an existing topic, or the number of partitions of a topic to be deleted. We chose to express the quota in partition mutations because the load in the controller is highly correlated to the number of created or deleted partitions.

The broker hosting the controller receives all the above request (already the case today) and can accept partition mutations with the rate R partitions per second and the maximum burst worth of B partitions. All the operations worth of N partition mutations are accepted until the burst is exhausted. All the remaining are rejected with a `QUOTA_VIOLATED` error. The complete algorithm is formalised bellow.

The reason to explicitly defines a burst B is to allow a burst of several mutations with a small or medium number of partitions within the same request. Typically, applications tend to send one request with all the topics when they start. This would not work without an explicit burst.

Throttling Algorithm

We propose to use a variation of the Token Bucket algorithm to achieve this behavior:

Let's define:

  • K: The number of tokens in the bucket
  • B: The maximum burst - The maximum number of tokens that can be accumulated in the bucket
  • R: The rate at which we accumulate tokens
  • T: The last time K got updated

At the time now, we update the number of tokens in the bucket with:

  • K = max(K + (now - T) * R), B)

A partition mutation with N partitions is admitted iff K > 0, and updates K = K - N. The partition mutation request is rejected otherwise. The variation here is that we allow the number of tokens to become negative to accommodate any number of partitions for a topic without restricting it.

If the number of tokens decreases below 0 (K <= 0), a new operation with N partition mutations has to wait for R * ( -K + N ).

Partition Mutations Quota Manager

We propose to introduce a new quota manager called `PartitionMutationsQuotaManager` which implements the algorithm described above. The new quota manager will be configured by two dynamic configurations: 'controller.partition.mutations.rate` and `controller.partition.mutations.burst`. While the configuration could be set by broker, we envision to have it set only has a cluster wide default for two reasons:

  1. It is effectively used only by the broker which hosts the controller.
  2. It does not make sense to have different values in the cluster. 

Handling of new Clients

For new clients, the new `QUOTA_VIOLATED` error code will be used to inform client that a topic has been rejected. The `throttle_time_ms` field be used to inform the client about how long it has been throttled:

  • throttle_time_ms = max(<request throttle time>, min(<throttle time of rejected topics in the request>) - <waiting time in the purgatory if applied>)

The channel will be mutated as well when `throttle_time_ms > 0`. This is necessary to cope with misbehaving clients which would not honour the throttling time. This logic already exist and is used by the request quota thus we will reuse it.

Handling of old Clients

For old clients, we can't gracefully reject any topics requests. Therefore, we propose to always accept them and to throttle the client by muting is channel. The throttle time will be computed at the time the client should have waited for. The `throttle_time_ms` field will be populated accordingly. For `CreateTopicResponse` and `CreatePartitionsResponse`, we will also populate the `ErrorMessage` to explain that they have been throttled if they were.

Co-Existence of Multiple Quotas

The new partition mutations quota can be used in conjunction with the existing request quota. When both are used, the client will be throttled by the most constraining of the two.

Changes to Kafka Admin Client

Automatic Retry

A new retryable `QuotaViolatedException` will be introduce on the client side. By default, the admin client will retry this error until `default.api.timeout.ms` is reached and also honour the throttling time. We will extend the retry mechanism already in place to support this. Once `default.api.timeout.ms` has been reached, the topics which were throttled will return the `QuotaViolatedException` to the caller. Any other errors will be given back to the caller as today. As the new exception is a retryable exception, it should not break code of users when they upgrade as handling retryable exception is already something that they should do.

Manual Retry

To complement this default behavior, we propose to add a new configuration to the admin client to allow users to opt-out from the automatic retry mechanism. It allows application to have fine grained control.

Public Interfaces

Protocol

In order to be able to distinguish the new clients from the old ones, we will bump to version of the following requests/responses:

  • `CreateTopicsRequest` and `CreateTopicsResponse` to version 6.
  • `CreatePartitionsRequest` and `CreatePartitionsResponse` to version 3.
  • `DeleteTopicRequest` and `DeleteTopicResponse` to version 5.

Starting from the bumped version, the new `QUOTA_VIOLATED` error will be used. It won't be used for older versions.

We will add the `ErrorMessage` field in the `DeleteTopicResponse` as follow:

{
  "apiKey": 20,
  "type": "response",
  "name": "DeleteTopicsResponse",
  // Version 1 adds the throttle time.
  //
  // Starting in version 2, on quota violation, brokers send out responses before throttling.
  //
  // Starting in version 3, a TOPIC_DELETION_DISABLED error code may be returned.
  //
  // Version 4 is the first flexible version.
  //
  // Version 5 adds the ErrorMessage field.
  "validVersions": "0-5",
  "flexibleVersions": "4+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]DeletableTopicResult", "versions": "0+",
      "about": "The results for each topic we tried to delete.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
        "about": "The topic name" },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The deletion error, or 0 if the deletion succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "5+", "nullableVersions": "5+", "ignorable": true,
        "about": "The error message, or null if there was no error." }
    ]}
  ]
}

Broker Configuration

We propose the introduce the following new configurations in the Kafka Broker:

NameTypeDefaultUpdate ModeDescription
quota.partition.mutations.burstLongLong.MaxValuecluster-wideThe maximum burst of partition mutations allowed at any given second.
quota.partition.mutations.rateLongLong.MaxValuecluster-wide

The rate at which partition mutations are accepted for the create topics request, the create partitions request and the delete topics request.

Admin Client Configuration

We propose the introduce the following new configuration in the Admin Client:

NameTypeDefaultDescription
retry.quota.violation.exceptionBooleantrueWhether the `QuotaViolationException` must be automatically retried or not.

Broker Metrics

We propose to expose the following new metric in the Kafka Broker:

GroupNameDescription
PartitionMutationsQuotaManagerremaining-tokensThe number of remaining tokens in the bucket. <= 0 indicates that partition mutations are throttled. 

Public API

As mentioned, we propose to introduce a new retryable `QuotaViolatedException` exception which will be given back to the called when a topic is rejected due to throttling.

/**
 * Exception thrown if an operation on a resource violate the quota.
 */
public class QuotaViolationException extends RetryableException {
	private int throttleTimeMs; 
	
    public QuotaViolationException(int throttleTimeMs, String message) {
        super(message);
		this.throttleTimeMs = throttleTimeMs;
    }

    public QuotaViolationException(int throttleTimeMs, String message, Throwable cause) {
        super(message, cause);
		this.throttleTimeMs = throttleTimeMs;
    }

	public int throttleTimeMs() {
		return this.throttleTimeMs;
    }
}

Kafka Topic Command

We propose to disable the automatic try of the QuotaViolatedException for the `kafka-topics.sh` command in order to not have the command blocked until the retry period is exhausted. 

Known Limitations

  • The proposal do not support the old ways to alter topics via Zookeeper as it is not possible to control nor reject changes made via Zookeeper. As these ways are deprecated anyway, we felt that it is an acceptable limitation.
  • Similarly, the proposal is not compatible with the usage of the auto topic creation for the same reasons.

Compatibility, Deprecation, and Migration Plan

Compatibility with Old Clients

  • By default, there is no impact on existing users since the throttling must be enabled.
  • If the throttling is enabled in a cluster used by old clients, the old clients will be transparently throttled. If the client is throttled, its request will timeout and thus will be retry by the existing mechanism. If the retries fail, the application will receive a TimeoutException which is a retryable exception. Handling TimeoutException and more generally retryable Exceptions is something that clients should already do thus impact is minimal. 

Compatibility after Upgrading the Admin Client to the new version

  • By default, the upgrade should be transparent since the Admin Client will automatically retry QuotaViolationException and return it to the caller only if the retry timeout is reached. In this case, the caller must at minimum handle the RetryableException and retry. Handling retryable Exceptions is something that we can safely expect from clients.

Rejected Alternatives

Throttle the Execution instead of the Admission

The major diametrically opposed alternative consists in throttling the internal execution in the controller of the requests instead of throttling and rejecting them during their admission in the API layer. Requests would already be accepted.

While this alternative would benefit from having zero client side changes, we believe that it is a sub-optimal solution for the clients for the following reasons:

  • By default, clients use a 30s request timeout meaning that throttling the execution for longer period would result in TimeoutException in the client which would automatically retry them. As its stand, the retry request would be rejected with a TopicAlreadyExistException which is weird. This is due to the fact that topics are registered in Zookeeper to notify the controller and thus are already created. Even without this, it feels wrong with our current API. This could work if we would change our API to be asynchronous. For instance, we could provide an API which allows clients to get the status of a topic (e.g. creating, ready, deleting, etc.) and continue to use the existing API to only trigger the creation. This has its own compatibility challenges.
  • Similarly, when users use the `kafka-topics.sh` tool, they would be blocked until the retry time is exhausted and also end up with a weird message.
  • This approach would keep the amount of pending requests in the system unbound. We believe that it is better having explicit limit and reject the extra work in order to really protect the system well.

Throttle the Admission but without en explicit Error code

Another alternative would be to method used for old clients for the new clients as well. We believe that it is a sub-optimal long term solution for the following reasons:

  • Muting the socket to limit the client to send more requests works fine for one client. The AdminClient is used from several places (different instances of an application, command line tools, etc) and thus does not use the same connection all the time. In this case, muting the socket is a sub-optimal solution.
  • We believe that having an explicit error code and message is better.



  • No labels