Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

With KIP-74, we now have a good way to limit the size of Fetch responses, but it may still be difficult for users to control overall memory since the consumer will send fetches in parallel to all the brokers which own partitions that it is subscribed to. Currently we have:

-max.fetch.bytes: This enabled enables to control how much data will be returned by the broker for one fetch

...

The following option will be added for consumers to configure (in ConsumerConfigs.java):

  1. buffer.memory: Type: Long, Priority: High, Default: 100MB

    The total bytes of memory the consumer can use to buffer records received from the server and waiting to be processed (decompressed and deserialized).

    This setting differs from the total memory the consumer will use because some additional memory will be used for decompression (if compression is enabled), deserialization as well as for maintaining in-flight requests.

    Note that this setting must be at least as big as max.fetch.bytes.

Alongside, we will set the priority of max.partition.fetch.bytes to Low.

...

2) In Selector.pollSelectionKeys(), before reading from sockets, the consumer code will check there is still available space left in the MemoryPool.

3) If there is space, pollSelectionKeys will read from sockets and store messages in the MemoryPool, otherwise read is skipped.

4) Once messages are decompressed (if compression if enabled), deserialized and queued up to be returned to the user , messages are deleted from the MemoryPool so new messages can be stored(in Fetcher.parseFetchedData), the memory used in the MemoryPool is released.

The ConsumerRecords are not stored in the MemoryPool but in the Java heap. The MemoryPool is only used to store bytes received from the network.

The consumer (Fetcher) delays decompression until the records are returned to the user, but because of max.poll.records, it may end up holding onto the decompressed data from a single partition for a few iterations. Therefore buffer.memory is not a hard bound constraint on the consumer's memory usage as mentioned in the setting's description.

Caveats:

There is a risk using the MemoryPool that, after we fill up the memory with fetch data, we can starve the coordinator's connection.

For example, if we send a bunch of pre-fetches right before returning to the user, these fetches might return before the next call to poll(), in which case we might not have enough memory to receive heartbeats, which would block us from sending additional heartbeats until the next call to poll(). Because heartbeats are tiny, this is unlikely to be a significant issue. In any case, KAFKA-4137 (separate network client) suggests a possible way to alleviate this issue but is not a mandatory pre-requisite for this KIP.

Compatibility, Deprecation, and Migration Plan

...

  • Limit sending FetchRequests once a specific number of in-flight requests is reached:

    While this was initially considered, this method will result in a loss of performance. Throttling FetchRequest means that when memory is freed, the consumer first has to send a new FetchRequest and wait fro the broker response before it can consume new messages.

  • Explicit disposal of memory by the user:

    It was suggested to have an explicit call to a dispose() method to free up memory in the MemoryPool. In addition of breaking the API, this seems confusing for Java.