You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 18 Next »

Status

Current state: Under Discussion

Discussion thread: HERE

JIRA: KAFKA-4133

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

With KIP-74, we now have a good way to limit the size of Fetch responses, but it may still be difficult for users to control overall memory since the consumer will send fetches in parallel to all the brokers which own partitions that it is subscribed to. To give users finer control, it might make sense to add a new setting to properly limit the memory used by Fetch responses in the consumer in a similar fashion than what we already have on the producer.

Public Interfaces

The following option will be added for consumers to configure (in ConsumerConfigs.java):

  1. buffer.memory (Long) The total bytes of memory the consumer can use to buffer fetch responses waiting to be read after being received from the server.

No other changes of APIs.

Proposed Changes

This KIP reuses the MemoryPool interface from KIP-72.

1) At startup, the consumer will initialize a MemoryPool with the size the user specified using buffer.memory. This pool enables to track how much memory the consumer is using for received messages.

2) In Selector.pollSelectionKeys(), before reading from sockets, the consumer will check there is still available space left in the MemoryPool.

3) If there is space, pollSelectionKeys will read from sockets and store messages in the MemoryPool, otherwise read is skipped.

4) Once messages are returned to the user, messages are deleted from the MemoryPool so new messages can be stored.

Compatibility, Deprecation, and Migration Plan

This KIP should be transparent to users not interested in setting this new configuration. Users wanting to take advantage of this new feature will just need to add this new settings to their consumer's properties.

Rejected Alternatives

  • Limit sending FetchRequests once a specific number of in-flight requests is reached :

    While this was initially considered, this method will result in a loss of performance. Throttling FetchRequest means that when memory is freed, the consumer first has to send a new FetchRequest and wait fro the broker response before it can consume new messages.


  • No labels