Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

1) At startup, the consumer will initialize a MemoryPool with the size the user specified using buffer.memory. This pool enables to track how much memory the consumer is using for received messages. The memory is not pre-allocated but only used as needed.

...

The consumer (Fetcher) delays decompression until the records are returned to the user, but because of max.poll.records, it may end up holding onto the decompressed data from a single partition for a few iterations. Therefore buffer.memory is not a hard bound constraint on the consumer's memory usage as mentioned in the setting's description.

 

To protect against a risk of

Caveats:

There is a risk using the MemoryPool that, after we fill up the memory with fetch data, we can starve the coordinator's connection. For example, if we send a bunch of pre-fetches right before returning to the user, these fetches might return before the next call to poll(), in which case we might not have enough memory to receive heartbeats, which would block us from sending additional heartbeats until the next call to poll(). Because heartbeats are tiny, this is unlikely to be a significant issue. In any case, KAFKA-4137 (separate network client) suggests a possible way to alleviate this issue but is not a mandatory pre-requisite for this KIP

To alleviate this issue, only messages larger than 1Kb will be allocated in the MemoryPool. Smaller messages will be allocated directly on the Heap like before. This allows group/heartbeat messages to avoid being delayed if the MemoryPool fills up.

Compatibility, Deprecation, and Migration Plan

...

  • Limit sending FetchRequests once a specific number of in-flight requests is reached:

    While this was initially considered, this method will result in a loss of performance. Throttling FetchRequest FetchRequests means that when memory is freed, the consumer first has to send a new FetchRequest and wait fro from the broker response before it can consume new messages.

  • Explicit disposal of memory by the user:

    It was suggested to have an explicit call to a dispose() method to free up memory in the MemoryPool. In addition of breaking the API, this seems confusing for Java.