Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. buffer.memory (Long) The total bytes of memory the consumer can use to buffer fetch responses waiting to be read after being received from the server.

No other changes of APIs.

Proposed Changes

Update Fetcher.java so before it initiate new Fetches, it checks:

  • The CompletedFetch list, counting the raw size of each record
  • The in-flight request, counting them as max.fetch.bytes

This KIP reuses the MemoryPool interface from KIP-72.

1) At startup, the consumer will initialize a MemoryPool with the size the user specified using buffer.memory. This pool enables to track how much memory the consumer is using for received messages.

2) In Selector.pollSelectionKeys(), before reading from sockets, the consumer will check there is still available space left in the MemoryPool.

3) If there is space, pollSelectionKeys will read from sockets and store messages in the MemoryPool, otherwise read is skipped.

4) Once messages are returned to the user, messages are deleted from the MemoryPool so new messages can be stored.By adding the 2 together, we can estimate the currently used memory (usedBytes). To determine how many new fetches can be made, we can then divide the available space (buffer.memory - usedBytes) by max.fetch.bytes.

Compatibility, Deprecation, and Migration Plan

This KIP should be transparent to users not interested in setting this new configuration. Users wanting to take advantage of this new feature will just need to add this new settings to their consumer's properties.

Rejected Alternatives

  • Count CompletedFetch as

    Limit sending FetchRequests once a specific number of in-flight requests

    :

    is reached :

    While this was initially considered, this method will result in a loss of performance. Throttling FetchRequest means that when memory is freed, the consumer first has to send a new FetchRequest and wait fro the broker response before it can consume new messagesA simpler alternative would be to consider CompletedFetch responses as in-flight requests and count their size as max.fetch.bytes bytes instead of their actually size. That way we wouldn't have to keep counting the memory used by CompletedFetch regularly. The issue is that the actually size fetched can frequently be a lot smaller than max.fetch.bytes, thus it would often overestimate the usage of buffer.memory.