Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

With KIP-74, we now have a good way to limit the size of Fetch responses, but it may still be difficult for users to control overall memory since the consumer will send fetches in parallel to all the brokers which own partitions that it the client is subscribed to. Currently we have:

...

To give users simpler control, it might make makes sense to add a new setting to properly limit the memory used by Fetch responses in the consumer in a similar fashion than what we already have on the producer.

...

The following option will be added for consumers to configure (in ConsumerConfigs.javato the Consumer configuration):

  1. buffer.memory: Type: Long, Priority: High, Default: 100MB

    The total bytes of memory the consumer can use to buffer records received from the server and waiting to be processed (decompressed and deserialized).

    This setting slightly differs from the total memory the consumer will use because some additional memory will be used for decompression (if compression is enabled), deserialization as well as for maintaining in-flight requests.

    Note that this setting must be at least as big as max.fetch.bytes.

...

1) At startup, the consumer will initialize a MemoryPool with the size the user specified using by buffer.memory. This pool enables to track how much memory the consumer is using for received messages. The memory is not pre-allocated but only used as needed.

...

pollSelectionKeys() will only read messages coming from the Coordinator (identified using the priority flag) and those will be allocated outside of the Pool. Such messages should be small enough to not use too much extra memory. This will prevent any Coordinator starvation (For example, if we send a bunch of pre-fetches right before returning to the user, these fetches might return before the next call to poll(), in which case we might not have enough memory to receive heartbeats, which would block us from sending additional heartbeats until the next call to poll()). When the Pool is not full, messages from the Coordinator are handled normally (3.a). 

4) Once messages are decompressed (if compression if enabled), deserialized and queued up to be returned to the user (in Fetcher.parseFetchedData()), the memory used in the MemoryPool is released.

...