You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current stateDiscussion

Discussion threadhere

JIRAKAFKA-6028

Released: <Kafka Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP addresses a fundamental weakness in Kafka's quota design.

Kafka currently throttles clients that violate quotas by delaying the responses. In other words, a client cannot know that it has been throttled until it receives the response. While this works as long as the throttle time is low and the clients are cooperative, this may not always work when the delay is long. Consider the following case for producers (although the same defect can affect consumers):

  1. A producer sends a ProduceRequest.
  2. The broker decides the producer needs to be throttled for X seconds
  3. The producer hits its request timeout before receiving the ProduceResponse
  4. The producer disconnects and retries the ProduceRequest.
  5. The second ProduceRequest gets throttled again (possibly for an even longer duration than the previous throttle).
  6. The producer times out again, disconnects and retries.
  7. The pattern continues indefinitely putting more and more load on the cluster.

This scenario is not far fetched. In fact we have seen this problem on multiple occasions when a MapReduce job tries to push data to a Kafka cluster.

The fundamental problem is that clients see only a timeout on violating quota. Therefore clients have no way to distinguish between timeout scenarios (such as network partitions) and quota violation scenarios.

Public Interfaces

No public interface changes are needed. We only propose behavior change on the broker side.

Proposed Changes

We propose the following changes:

  1. After the broker processes a client request and decides that the client is to be throttled for time X, the broker will not hold the response as it does today but will return the response immediately. The response will fill its throttle time field with time X.
  2. The broker will then mute the channel corresponding to this client for X seconds. (The broker can use a delayed queue to unmute the channel.)
  3. When a client receives a response with throttle time X should refrain from sending any further requests for time X. (The usual idle timeout i.e., connections.max.idle.ms should be ignored during the throttle time X.)

Although older client implementations (prior to knowledge of this KIP) will immediately send the next request after the broker responds without paying attention to the throttle time field, the broker is protected by virtue of muting the channel for time X. i.e., the next request will not be processed until the channel is unmuted. Although this does not prevent the the socket buffers from being utilized, the broker's request handlers are not recruited to handle the throttled client's request if it has not backed off sufficiently to honor the throttle time. Since this subsequent request is not actually handled until the broker unmutes the channel, the client can hit request.timeout.ms and reconnect. However, this is no worse than the current state.

Compatibility, Deprecation, and Migration Plan

The change is fully backwards compatible.

Test Plan

Set quota to a very low value to verify that the clients are neither timing out or sending more requests to the broker before throttle time has passed.

Rejected Alternatives

One potential solution to the above problem is to set a very high request.timeout.ms on the client side. This is not ideal because the request timeout is not supposed to be set in response to a throttling condition.

  • No labels