Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In case of consumer, the action is very similar to a delayed fetch request. The request needs to be delayed before reading from the log to avoid running out of memory pressure.

Here's how the code might look inside the Broker (in KafkaApis, ReplicaManager or helper class). This is just for illustration purposes.

...

Kafka server has decided to migrate it's metrics to Kafka Metrics (KM). For more details read this. Kafka metrics maintains a Sensor for each measured metric. This can be configured with a "Quota" which is a bound of the min and max value of a metric. If recording a value causes a bound to get exceeded, a QuotaViolationException is thrown. We will add a bytes-in/bytes-out rate sensor per clientId configured with 'N' 1 second windows. This is already supported in MetricConfig. We can make 'N' configurable and use a good default (10). This lets us precisely track the rate per-client per second. Having small windows makes the quota system very responsive. A quota will also be configured when creating these metrics. For default clients, the quota used will simply be the default quota value. For clients with overridden quotas, we can use the custom value. 

In order to fully support quotas, we need to make some changes to the metrics package as defined below. 

...

Configuration Management

We need a mechanism to configure the default quotas and the per-user overrides. There is general agreement that we eventually need dynamic configs per-user client to fully operationalize quotas but for the purposes of this proposal, we will proceed with static configs.

Code Block
// Default bytes-out per consumer.
quota.consumer.default=2M
quota.producer.default=2M
 
// Overrides
quota.producer.override="clientA:4M;clientB:10M"
quota.consumer.override="clientC:3M;clientD:5M"

There is a separate discussion for dynamic configs per-user that isn't fully fleshed out. If it makes sense and people agree on a final design, we can model quotas using it.

 

Tooling/Monitoring Changes

Upon implementation, we will start exposing per-client byte rate metrics metrics to JMX. Since these are new metrics there are no backward compatibility issues. These metrics will be documented prior to the release. 

Client Status Code

In the current response protocol, there is no way to return the quota status back to the client. How do clients know if they are being throttled or not? One solution is to add a new field in the response that indicates the quota status. This will require us to increment the protocol version for both producer and consumer. Clients that send V0 requests will not receive the quota status flag in the response. On the client side (producer and consumer), we can have a metric that exposes whether the client was throttled during the last measured window or not. 

This work item does not block the core implementation of quotas but IMO is very useful to have. 

 The response protocol can define a top-level field called "isThrottled". Example:

Code Block
// Current fetch response
public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));


// Proposed fetch response
public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0)), new Field("isThrottled", INT8, "Was the request throttled or not"));
 
// Current produce response
public static final Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
                                                                new ArrayOf(new Schema(new Field("topic", STRING),
                                                                                       new Field("partition_responses",
                                                                                                 new ArrayOf(new Schema(new Field("partition",
                                                                                                                                  INT32),
                                                                                                                        new Field("error_code",
                                                                                                                                  INT16),
                                                                                                                        new Field("base_offset",
                                                                                                                                  INT64))))))));


// Proposed produce response
public static final Schema PRODUCE_RESPONSE_V0V1 = new Schema(new Field("responses",
                                                                new ArrayOf(new Schema(new Field("topic", STRING),
                                                                                       new Field("partition_responses",
                                                                                                 new ArrayOf(new Schema(new Field("partition",
                                                                                                                                  INT32),
                                                                                                                        new Field("error_code",
                                                                                                                                  INT16),
                                                                                                                        new Field("base_offset",
                                                                                                                                  INT64))))))),
                                                            new Field("isThrottled", INT8, "Was the request throttled or not"));

NOTE: This was briefly discussed on the mailing list and was never concluded. One thing that we did rule out was to overload the error code field because being throttled isn't really an error and it can get really confusing.

Compatibility, Deprecation, and Migration Plan

...