Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Tooling/Monitoring Changes

Upon implementation, we will start exposing the following metrics to JMX. Since these are new metrics there are no backward compatibility issues. These metrics will be documented prior to the release.

  • per-client byte rate metrics
  • per-client metric to indicate throttle times

Client Status Code

In the current response protocol, there is no way to return the quota status back to the client. How do clients know if they are being throttled or not? Our proposed solution is to add a new field in the response that indicates the quota status called 'throttleTimeMs'. This will require us to increment the protocol version for both producer and consumer. Clients that send current version (0) of those requests will not receive the quota status flag in the response. On the client side (producer and consumer), we can expose the following metrics over a time window:

  • Max request throttle time - This is the largest throttle time seen for a request. If not throttled, it will simply expose 0.
  • Avg request throttle time - Exposes the average throttle time for each request during a window.

The response protocol can define a top-level field called "throttleTime".

Code Block
// Current fetch response
public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));


// Proposed fetch response
public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0)), new Field("throttleTimeMs", INT32, "Amount of time in milliseconds the request was throttled if at all"));
 
// Current produce response
public static final Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
                                                                new ArrayOf(new Schema(new Field("topic", STRING),
                                                                                       new Field("partition_responses",
                                                                                                 new ArrayOf(new Schema(new Field("partition",
                                                                                                                                  INT32),
                                                                                                                        new Field("error_code",
                                                                                                                                  INT16),
                                                                                                                        new Field("base_offset",
                                                                                                                                  INT64))))))));

// Proposed produce response
public static final Schema PRODUCE_RESPONSE_V1 = new Schema(new Field("responses",
                                                                new ArrayOf(new Schema(new Field("topic", STRING),
                                                                                       new Field("partition_responses",
                                                                                                 new ArrayOf(new Schema(new Field("partition",
                                                                                                                                  INT32),
                                                                                                                        new Field("error_code",
                                                                                                                                  INT16),
                                                                                                                        new Field("base_offset",
                                                                                                                                  INT64))))))),
                                                            new Field("throttleTimeMs", INT32, "Amount of time in milliseconds the request was throttled if at all"));

Compatibility, Deprecation, and Migration Plan

 

  • What impact (if any) will there be on existing users?
    Once the brokers are upgraded, existing users will receive a default quota. Their produce and fetch latencies will increase. Users that require more than the default quota will need to have overrides in advance to avoid getting throttled.
  • If we are changing behavior how will we phase out the older behavior?
    1. Set "inter.broker.protocol.version" to that of the previous release (0.8.2)
    2. Perform a rolling upgrade on the brokers so that they can all decipher the latest version of the Fetch response as described above.
    3. Change to "inter.broker.protocol.version" config to that of current release (0.9?).
    4. Rolling upgrade of the brokers
 

...