Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP reuses the MemoryPool interface from KIP-72.

We propose to change the constructors of Node and KafkaChannel:

public Node(int id, String host, int port, String rack, boolean priority) {}
public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize, boolean priority) {}

The new 'priority' argument will be used to mark connections to the Coordinator. Currently those are only identified by using a large id, marking them explicitly will make their detection simpler.

 

1) At startup, the consumer will initialize a MemoryPool with the size the user specified using buffer.memory. This pool enables to track how much memory the consumer is using for received messages. The memory is not pre-allocated but only used as needed.

2) In Selector.pollSelectionKeys(), before reading from sockets, the code will check there is still available space left in the MemoryPool.

3) here we have 2 options:

3.a If there is space

...

in the Pool: 

pollSelectionKeys() will read from sockets and store messages in the MemoryPool

...

. The fairness mechanism introduced in KIP-74 ensures the consumer sends FetchRequests to all partitions it is subscribed to in round robin fashion so we don't need to introduce a new mechanism for fairness when reading from the sockets.

3.b If there is no space in the Pool:

pollSelectionKeys() will only read messages coming from the Coordinator (identified using the priority flag) and those will be allocated outside of the Pool. Such messages should be small enough to not use too much extra memory. This will prevent any Coordinator starvation (For example, if we send a bunch of pre-fetches right before returning to the user, these fetches might return before the next call to poll(), in which case we might not have enough memory to receive heartbeats, which would block us from sending additional heartbeats until the next call to poll()). When the Pool is not full, messages from the Coordinator are handled normally (3.a).

 

4) Once messages are decompressed (if compression if enabled), deserialized and queued up to be returned to the user (in Fetcher.parseFetchedData()), the memory used in the MemoryPool is released.

...

The consumer (Fetcher) delays decompression until the records are returned to the user, but because of max.poll.records, it may end up holding onto the decompressed data from a single partition for a few iterations. Therefore buffer.memory is not a hard bound constraint on the consumer's memory usage as mentioned in the setting's description.

Caveats:

There is a risk using the MemoryPool that, after we fill up the memory with fetch data, we can starve the coordinator's connection. For example, if we send a bunch of pre-fetches right before returning to the user, these fetches might return before the next call to poll(), in which case we might not have enough memory to receive heartbeats, which would block us from sending additional heartbeats until the next call to poll().

To alleviate this issue, we won't use the MemoryPool for messages received from the coordinator. To do that, we will mark the Node/Channel used by the coordinator with a flag (priority).  When reading messages of the network, messages coming from a Node/Channel with the flag set will be directly allocated (like without a MemoryPool) and have no chance of being delayed if the MemoryPool fills up.

Compatibility, Deprecation, and Migration Plan

...