Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Tooling/Monitoring Changes

Upon implementation, we will start exposing the following metrics to JMX. Since these are new metrics there are no backward compatibility issues. These metrics will be documented prior to the release.

  • per-client byte rate metrics
  • per-client metric to indicate if it is throttled or not

Client Status Code

In the current response protocol, there is no way to return the quota status back to the client. How do clients know if they are being throttled or not? One solution is to add a new field in the response that indicates the quota status. This will require us to increment the protocol version for both producer and consumer. Clients that send V0 requests will not receive the quota status flag in the response. On the client side (producer and consumer), we can have a metric that exposes whether the client was throttled during the last measured window or not. 

This work item does not block the core implementation of quotas but IMO is very useful to have. 

 The response protocol can define a top-level field called "isThrottled". Example:

Code Block
// Current fetch response
public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));


// Proposed fetch response
public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0)), new Field("isThrottled", INT8, "Was the request throttled or not"));
 
// Current produce response
public static final Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
                                                                new ArrayOf(new Schema(new Field("topic", STRING),
                                                                                       new Field("partition_responses",
                                                                                                 new ArrayOf(new Schema(new Field("partition",
                                                                                                                                  INT32),
                                                                                                                        new Field("error_code",
                                                                                                                                  INT16),
                                                                                                                        new Field("base_offset",
                                                                                                                                  INT64))))))));

// Proposed produce response
public static final Schema PRODUCE_RESPONSE_V1 = new Schema(new Field("responses",
                                                                new ArrayOf(new Schema(new Field("topic", STRING),
                                                                                       new Field("partition_responses",
                                                                                                 new ArrayOf(new Schema(new Field("partition",
                                                                                                                                  INT32),
                                                                                                                        new Field("error_code",
                                                                                                                                  INT16),
                                                                                                                        new Field("base_offset",
                                                                                                                                  INT64))))))),
                                                            new Field("isThrottled", INT8, "Was the request throttled or not"));

NOTE: This was briefly discussed on the mailing list and was never concluded. One thing that we did rule out was to overload the error code field because being throttled isn't really an error and it can get really confusing.

Dynamic Configuration

This part of the proposal focuses on a mechanism to change client based configuration on the fly. This proposal will reuse pieces of the TopicQuotaManager. Similar to topic configs, we can add a class called ClientConfigManager. This class will listen to Zk notifications from a certain path and apply the notifications within the brokers.

ZNode Structure

Code Block
Configs are stored here
/config/clients/<client_id>

ZNode internally can look like this. We shall only have znodes for clients whose quota has been overridden.
{"version": x, "config" : {fetch-quota: 10M, X1=Y1, X2=Y2..}
 
All config changes will be sequential znodes under
/config/clients/config_change_XX

Here, all the client specific information is retained under the /config/clients path. The notification znode is intentionally separate from the one used by TopicConfigManager. This is to avoid processing each others changes and more importantly for safety (can't accidentally change topic configs instead of client configs etc..)

We can add tooling within AdminUtils to update client configuration on the fly.

NOTE: This brings a nice consistency with how we do dynamic topic based configs currently. There is some overlap with KIP-5 but that is more focussed on internal broker configs whereas this is entity based (topic and clients). Additionally, IMO KIP-5 is already a very large feature and it may be easier to tackle these issues separately.

Compatibility, Deprecation, and Migration Plan

Rejected Alternatives

Topic Based QuotasWe initially considered doing topic based quotas. This was rejected in favor or client based quotas since it seemed more natural to throttle clients than topics. Multiple producers/consumers can publish/fetch from a single topic. This makes it hard to eliminate interference of badly behaved clients. Topic based quotas also are harder to reason about when clients are accessing multiple topics (e.g. wildcard consumer). If quota for 1 topic is violated, how do we handle requests for the other topics?

...