Table of Contents |
---|
Status
Current state: Under DiscussionAccepted
Discussion thread: HERE
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
None of these settings take into account that the consumer will be sending requests to multiple brokers in parallel so in practice the memory usage is as stated in KIP-74: min(num brokers * max.fetch.bytes, max.partition.fetch.bytes * num_partitions)
To give users simpler control, it makes sense to add a new setting to properly limit the memory used by Fetch responses in the consumer in a similar fashion than what we already have on the producer.
...
Proposed Changes
This KIP reuses the MemoryPool interface implementation from KIP-72 (SimpleMemoryPool).
We propose to change the constructors of Node and KafkaChannel:
...
The new 'priority' argument will be used to mark connections to the Coordinator. Currently those are only identified by using a large id, marking them explicitly will make their detection simpler.
1) At startup, the consumer will initialize a MemoryPool with the size specified by buffer.memory. This pool enables to track how much memory the consumer is using for received messages. The memory is not pre-allocated but only used as needed.
...
The consumer (Fetcher) delays decompression until the records are returned to the user, but because of max.poll.records
, it may end up holding onto the decompressed data from a single partition for a few iterations. Therefore buffer.memory
is not a hard bound constraint on the consumer's memory usage as mentioned in the setting's description.
Similarly to KIP-72, metrics about the MemoryPool (usage, free space, etc) will be exposed by the Consumer:
memory-pool-free
: The amount of free memory in the MemoryPoolmemory-pool-used
: The amount of used memory in the MemoryPoolmemory-pool-avg-depleted-percent
: The percentage of time when the MemoryPool is fullmemory-pool-depleted-time-total
: The duration when the MemoryPool is full
Compatibility, Deprecation, and Migration Plan
This KIP should be transparent to users not interested in setting this new configuration. Users wanting to take advantage of this new feature will just need to add this new settings to their consumer's properties.
Rejected Alternatives
Limit sending FetchRequests once a specific number of in-flight requests is reached:
While this was initially considered, this method will result in a loss of performance. Throttling FetchRequests means that when memory is freed, the consumer first has to send a new FetchRequest and wait from the broker response before it can consume new messages.
Explicit disposal of memory by the user:
It was suggested to have an explicit call to a dispose() method to free up memory in the MemoryPool. In addition of breaking the API, this seems confusing for Java
...