You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: KAFKA-2063

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently the only possible way for client to limit fetch response size is via per-partition response limit taken from config setting max.partition.fetch.bytes.

So the maximum amount of memory the client can consume is max.partition.fetch.bytes * num_partitions.

This leads to following problems:

  1. Since num_partitions can be quite big (several thousands), the memory required for fetch response can be several GB
  2. max.partition.fetch.bytes can not be set arbitrarily low since it should be greater than maximum message size for fetch request to work.
  3. Memory usage is not easily predictable - it depends on consumer lag

This KIP proposes to introduce new version of fetch request with new parameter "limit_bytes" to limit the size of fetch response and solve above problem. The per-partition limit is removed from fetch request.

Public Interfaces

This KIP introduces:

  • New fetch request (v.3) with a global response limit and without per-partition limit
  • New client-side config parameter fetch.limit.bytes - global fetch size limit
  • New replication config parameter replica.fetch.limit.bytes - limit used by replication thread
  • New server-side config parameter fetch.partition.max.bytes - maximum per-partition server side limit when serving new fetch request
  • New inter-broker protocol version "0.11.0-IV0" - starting from this version brokers will use fetch request v.3 for replication 

Proposed Changes

Proposed changes are quite straightforward. New fetch request processes partitions in order they appear in request.

For each partition server fetches up to fetch.partition.max.bytes, but not bigger than remaining response limit submitted by client.

Also, if remaining response limit is strictly greater than zero, the fetch size for partition is at least message.max.bytes. This is done to ensure that at least one message is present in all non-empty message sets.

For all other partitions server sends empty message sets.

This way we can ensure that response size is less than (limit_bytes + message.max.bytes).

Caveats

Since new fetch request processes partitions in order and stops fetching data when response limit is hit, client should use some kind of partition shuffling to ensure fairness.

Consider following example - suppose client want to fetch from 4 partitions: A, B, C, D. Suppose that partitions A and B are growing much faster than C and D.

If client is always fetching partitions in order A,B,C,D than it is possible that response limit is hit before any messages were fetched from C and D.

In this scenario client won't  get any messages from C and D until it catches up with A and B.

The solution is to start fetching from first empty partition in round-robin fashion or to perform random shuffle of partitions.

Compatibility, Deprecation, and Migration Plan

Client setting max.partition.fetch.bytes will be depricated in favour of fetch.limit.bytes. Former setting will be used only for old requests (if server side doesn't support new one).

Replication setting replica.fetch.max.bytes will be depricated in favour of replica.fetch.limit.bytes. Former setting will be used for inter-broker protocol older than "0.11.0-IV0".

New fetch request is designed to work properly even if fetch.limit.bytes is less than message size. This way we can ignore custom per-partition maximums since they are mostly done to accommodate custom message size.

Old fetch requests should be processed on server exactly as before this KIP.

Discussed/Rejected Alternatives

Some discussed/rejected alternatives:

  1. Do not deprecate per-partition limits, just add global limit. If global limit is zero, process request exactly as before. Pros: less intrusive change, clients can enable global limit only if they are ready to it (can do round-robin, etc). Cons: request becomes too confusing. It is unclear what should be use as partition-level limit.
  2. Do random partition shuffling on server side. Pros: ensure fairness without client-side modifications. Cons: non-deterministic behaviour on server side; round-robin can be easily implemented on client side
  • No labels