Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In case of producer, the broker will append the request batch to the log but will not return a response immediately. Instead, the response is inserted into a DelayedOperationPurgatory for a certain period of time before returning to the client. It is important that the append be done before delaying the client to avoid holding large batches in the broker heap and running out of memory.

In case of consumer, the action is very similar to a delayed fetch request. The request  the request needs to be delayed before reading from performing the log read to avoid memory pressure.

...

Tooling/Monitoring Changes

Upon implementation, we will start exposing the following metrics to JMX. Since these are new metrics there are no backward compatibility issues. These metrics will be documented prior to the release.

  • per-client byte rate metrics
  • per-client metric to indicate if it is throttled or notthrottle times

Client Status Code

In the current response protocol, there is no way to return the quota status back to the client. How do clients know if they are being throttled or not? One Our proposed solution is to add a new field in the response that indicates the quota status. This will require us to increment the protocol version for both producer and consumer. Clients that send V0 requests will not receive the quota status flag in the response. On the client side (producer and consumer), we can have a metric that exposes whether the client was throttled during the last measured window or not. 

This work item does not block the core implementation of quotas but IMO is very useful to have. 

expose the following metrics over a time window:

  • Max request throttle time - This is the largest throttle time seen for a request. If not throttled, it will simply expose 0.
  • Avg request throttle time - Exposes the average throttle time for each request during a window.

 The response protocol can define a top-level field called "isThrottledthrottleTime". Example:

Code Block
// Current fetch response
public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));


// Proposed fetch response
public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0)), new Field("isThrottled", INT8, "Was the request throttled or not"));
 
// Current produce response
public static final Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
                                                                new ArrayOf(new Schema(new Field("topic", STRING),
                                                                                       new Field("partition_responses",
                                                                                                 new ArrayOf(new Schema(new Field("partition",
                                                                                                                                  INT32),
                                                                                                                        new Field("error_code",
                                                                                                                                  INT16),
                                                                                                                        new Field("base_offset",
                                                                                                                                  INT64))))))));

// Proposed produce response
public static final Schema PRODUCE_RESPONSE_V1 = new Schema(new Field("responses",
                                                                new ArrayOf(new Schema(new Field("topic", STRING),
                                                                                       new Field("partition_responses",
                                                                                                 new ArrayOf(new Schema(new Field("partition",
                                                                                                                                  INT32),
                                                                                                                        new Field("error_code",
                                                                                                                                  INT16),
                                                                                                                        new Field("base_offset",
                                                                                                                                  INT64))))))),
                                                            new Field("isThrottledthrottleTime", INT8INT32, "WasAmount of time the request was throttled if orat notall"));

NOTE: This was briefly discussed on the mailing list and was never concluded. One thing that we did rule out was to overload the error code field because being throttled isn't really an error and it can get really confusing.

Compatibility, Deprecation, and Migration Plan

...