Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under DiscussionAccepted

Discussion thread: here

JIRA: KAFKA-2063

Released: 0.10.1.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

This KIP proposes to introduce new version of fetch request with new top-level parameter response_max_bytes to limit the size of fetch response and solve above problem.

In particular, if consumer issues N parallel fetch requests, the memory consumption will not exceed N * response_ max_bytes.

Actually, it will be min(N * response_ max_bytes,  max.partition.fetch.bytes * num_partitions)  since per-partition limit is still respected.

...

  • New fetch request (v.3) with response size limit
  • New client-side config parameter fetch.response.max.bytes - client's fetch response size limit
  • New replication config parameter replica.fetch.response.max.bytes - limit used by replication thread
  • New inter-broker protocol version "0.10.1-IV0" - starting from this version brokers will use fetch request v.3 for replication 

...

Proposed changes are quite straightforward. We introduce FetchRequest v.3 with new top level parameter response_max_bytes:

Fetch Request (Version: 3) => replica_id max_wait_time min_bytes response_max_bytes [topics]
replica_id => INT32 max_wait_time => INT32 min_bytes => INT32 response_max_bytes => INT32 topics => topic [partitions] topic => STRING partitions => partition fetch_offset max_bytes partition => INT32 fetch_offset => INT64 max_bytes => INT32

 

...

Server processes partitions in order they appear in request.

If response_max_bytes parameter is Int.MAX_INT ("no limit"), the request is processed exactly as before.

Otherwise, for each partition except the first one server fetches up to corresponding partition limit max_bytes, but not bigger than remaining response limit.

...

This algorithm provides following guarantees:

  • FetchRequest with response_max_bytes != Int.MAX_INT always always makes progress - if server has message(s), than at least one message is returned irrespective of response_max_bytes
  • FetchRequest response size will not be bigger than max(response_max_bytes, size of the first message in first partition)

...

Compatibility, Deprecation, and Migration Plan

New The new fetch request is designed to work properly even if response_the top level max_bytes is less than the message size. If response_max_bytes is Int.MAX_INT, new request behaves exactly like old one.

So we can even make this KIP absolutely transparent for users by making setting default for both fetch.response.max.bytes and replica.fetch.response.max.bytes to Int.MAX_INT.

However, since clients like ReplicaFetcherThread and Java Consumer are ready for new fetch request, we decided to enable   We decided to establish the following defaults:

fetch.response.max.bytes = 50MB

replica.fetch.response.max.bytes = 10MB

...

  1. Together with addition of global response limit deprecate per-partitions limit. Rejected since per-partition limit can be useful for Kafka streams (see mail list discussion). 
  2. Do random partition shuffling on server side. Pros: ensure fairness without client-side modifications. Cons: non-deterministic behaviour on server side; round-robin can be easily implemented on client side.