Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently the only possible way for client to limit fetch response size is via per-partition response limit max_bytes taken from config setting max.partition.fetch.bytes.

...

This KIP proposes to introduce new version of fetch request with new parameter "response_max_bytes" to limit the size of fetch response and solve above problem.

...

Proposed changes are quite straightforward. We introduce FetchRequest v.3 with new parameter response_max_bytes:

Fetch Request (Version: 3) => replica_id max_wait_time min_bytes response_max_bytes [topics]
replica_id => INT32 max_wait_time => INT32 min_bytes => INT32 response_max_bytes => INT32 topics => topic [partitions] topic => STRING partitions => partition fetch_offset max_bytes partition => INT32 fetch_offset => INT64 max_bytes => INT32

 

Fetch Response v.3 will remain the same as v.2.

Server New fetch request processes partitions in order they appear in request.

If response_max_bytes parameter is zero Int.MAX_INT ("no limit"), the request is processed exactly as before.

Otherwise, for each partition except the first one server fetches up to corresponding partition limit parition_max_bytes, but not bigger than remaining response limit.

...

This algorithm provides following guarantees:

  • FetchRequest with non-zero response_max_bytes always != Int.MAX_INT always makes progress - if server has message(s), than at least one message is returned irrespective of response_max_bytes  parameter
  • FetchRequest response size will be max(response_max_bytes, message.max.bytes)

...

Since new fetch request processes partitions in order and stops fetching data when response limit is hit, client should use some kind of partition shuffling to ensure fairness.

...

In this scenario client won't  get any messages from C and D until it catches up with A and B.

The solution is to reorder partitions in fetch request in round-robin fashion to continue fetching from first empty partition in round-robin fashion received or to perform random shuffle of partitions before each request.

Round-robin shuffling seems to be more "fair" and predictable so we decided to deploy it at ReplicaFetcherThread and in Consumer Java API.

...

New fetch request is designed to work properly even if response_max_bytes is less than message size. If response_max_bytes is zeroInt.MAX_INT, new requests request behaves exactly like old one.

So we can even make this KIP absolutely transparent for users by making setting default for both fetch.response.max.bytes and replica.fetch.response.max.bytes to zeroInt.MAX_INT.

However, since clients like ReplicaFetcherThread and Java Consumer are ready for new fetch request, we decided to enable following defaults:

...