Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

1) At startup, the consumer will initialize a MemoryPool with the size the user specified using buffer.memory. This pool enables to track how much memory the consumer is using for received messages. The memory is not pre allocated but only used as needed.

2) In Selector.pollSelectionKeys(), before reading from sockets, the code will check there is still available space left in the MemoryPool.

3) If there is space, pollSelectionKeys() will read from sockets and store messages in the MemoryPool, otherwise read is skipped.

4) Once messages are decompressed (if compression if enabled), deserialized and queued up to be returned to the user (in Fetcher.parseFetchedData()), the memory used in the MemoryPool is released.

The ConsumerRecords objects are not stored in the MemoryPool but in the Java heap. The MemoryPool is only used to store bytes received from the network.

The consumer (Fetcher) delays decompression until the records are returned to the user, but because of max.poll.records, it may end up holding onto the decompressed data from a single partition for a few iterations. Therefore buffer.memory is not a hard bound constraint on the consumer's memory usage as mentioned in the setting's description.

...