Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP proposes to introduce new version of fetch request with new parameter "limitresponse_max_bytes" to limit the size of fetch response and solve above problem.

...

  • New fetch request (v.3) with a global response size limit
  • New client-side config parameter fetch.response.limitmax.bytes - global client's fetch response size limit
  • New replication config parameter replica.fetch.response.limitmax.bytes - limit used by replication thread
  • New server-side config parameter fetch.partition.max.bytes - maximum per-partition server side limit when serving new fetch requestNew inter-broker protocol version "0.11.0-IV0" - starting from this version brokers will use fetch request v.3 for replication 

...

Proposed changes are quite straightforward. New fetch request processes partitions in order they appear in request.

If response_max_bytes parameter is zero ("no limit"), the request is processed exactly as before.

For Otherwise, for each partition except the first one server fetches up to fetch.partition.max.corresponding partition limit parition_max_bytes, but not bigger than remaining response limit submitted by client.

Also, if remaining response limit is strictly greater than zero, the fetch size for partition is at least For the first partition, server always fetches at least message.max.bytes. Empty response limits will be returned for all partitions that didn't fit into response limit. This is done to ensure that

This algorithm provides following guarantees:

  • FetchRequest with non-zero response_max_bytes always makes progress - if server has message(s), than at least one message is

...

For all other partitions server sends empty message sets.

...

  • returned irrespective of response_max_bytes  parameter
  • FetchRequest response size will be max(response_max_bytes, message.max.bytes)

...

...

 

Since new fetch request processes partitions in order and stops fetching data when response limit is hit, client should use some kind of partition shuffling to ensure fairness.

...

In this scenario client won't  get any messages from C and D until it catches up with A and B.

The solution is to start continue fetching from first empty partition in round-robin fashion or to perform random shuffle of partitions.

Round-robin shuffling seems to be more "fair" and predictable so we decided to deploy it at ReplicaFetcherThread and in Consumer Java API.

 

Compatibility, Deprecation, and Migration Plan

Client setting max.partition.fetch.bytes will be depricated in favour of fetch.limit.bytes. Former setting will be used only for old requests (if server side doesn't support new one).

Replication setting replica.fetch.max.bytes will be depricated in favour of replica.fetch.limit.bytes. Former setting will be used for inter-broker protocol older than "0.11.0-IV0".

New fetch request is designed to work properly even if fetch.limit.bytes is less than message size. This way we can ignore custom per-partition maximums since they are mostly done to accommodate custom message size.

Old fetch requests should be processed on server exactly as before this KIP.

...

New fetch request is designed to work properly even if response_max_bytes is less than message size. If response_max_bytes is zero, new requests behaves exactly like old one.

So we can even make this KIP absolutely transparent for users by making setting default for both fetch.response.max.bytes and replica.fetch.response.max.bytes to zero.

However, since clients like ReplicaFetcherThread and Java Consumer are ready for new fetch request, we decided to enable following defaults:

fetch.response.max.bytes = 50MB

replica.fetch.response.max.bytes = 10MB

 

Rejected Alternatives

Some discussed/rejected alternatives:

  1. Do not Together with addition of global response limit deprecate per-partition limits, just add global partitions limit. If global limit is zero, process request exactly as before. Pros: less intrusive change, clients can enable global limit only if they are ready to it (can do round-robin, etc). Cons: request becomes too confusing. It is unclear what should be use as partition-level limit.Rejected since per-partition limit can be useful for Kafka streams (see mail list discussion). 
  2. Do random partition shuffling on server side. Pros: ensure fairness without client-side modifications. Cons: non-deterministic behaviour on server side; round-robin can be easily implemented on client side