Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. the pool is non-blocking, so network threads would not be blocked waiting for memory and could make progress elsewhere.
  2. SocketServer would instantiate and hold a memory pool, which Processor threads would try to allocate memory from when reading requests out of sockets (by passing the pool to instances of NetworkReceive that they create).
  3. NetworkReceive.readFromReadableChannel() would be modified to try allocating memory (it is already written in a way that reading may involve multiple repeated calls to readFromReadableChannel(), so not a big change to behavior)
  4. memory would be released at the end of request processing (in KafkaRequestHandler.run())
  5. to facilitate faster implementation (as a safety net) the pool will be implemented in such a way that memory that was not release()ed (but still garbage collected) would be detected and "reclaimed". this is to prevent "leaks" in case of code paths that fail to release() properly.

an initial implementation of these proposed changes is up on github - https://github.com/radai-rosenblatt/kafka/commits/broker-memory-pool

Caveats

  1. As the pool would allow any size request if it has any capacity available, the actual memory bound is queued.max.bytes + socket.request.max.bytes. The up-side is no issues with large requests getting starved out
  2. concerns have been raised about the possibility of starvation in Selector.pollSelectionKeys() - in case the order of keys in Set<SelectionKey> selectionKeys is deterministic and memory is tight, sockets consistently at the beginning of the set get better treatment then those at the end of the iteration order. to overcome this code has been put in place to shuffle the selection keys and handle them in random order ONLY IF MEMORY IS TIGHT (so if the previous allocation call failed). this avoids the overhead of the shuffle when memory is not an issue.

...

The current naming scheme of queued.max.requests (and the proposed queued.max.bytes) may be a bit opaque. Perhaps using requestQueue.max.requests and requestQueue.max.bytes would more clearly convey the meaning to users (indicating that these settings deal with the request queue specifically, and not some other). The current queued.max.requests configuration can be retained for a few more releases for backwards compatibility.

Configuration Validation

queued.max.bytes must be larger than socket.request.max.bytes (in other words, memory pool must be large enough to accommodate the largest single request possible), or negative (if disabled).

Test Plan

  • A unit test was written to validate the behavior of the memory pool
  • A unit test that validates correct behavior of RequestChannel under capacity bounds would need to be written.
  • A micro-benchmark for determining the performance of the pool would need to be written
  • Stress testing a broker (heavy producer load of varying request sizes) to verify that the memory limit is honored.
  • Benchmarking producer and consumer throughput before/after the change to prove that ingress/egress performance remains acceptable.

...

  1. Reducing producer max batch size: this is harmful to throughput (and is also more complicated to maintain from an administrator's standpoint than simply sizing the broker itself). This is more of a workaround than a fix
  2. Reducing producer max request size: same issues as above.
  3. Limiting the number of connected clients: same issues as above
  4. Reducing queued.max.requests in the broker: Although this will conservatively size the queue it can be detrimental to throughput in the average case.
  5. controlling the volume of requests enqueued in RequestChannel.requestQueue (would not suffice as no bound on memory read from actual sockets)

Implementation Concerns

  1. the order of selection keys returned from a selector.poll call is undefined. in case the actual implementation uses a fixed order (say by increasing handle id?) and under prolonged memory pressure (so never enough memory to service all requests) this may lead to starvation of sockets that are always at the end of the iteration order. to overcome this the code shuffles the selection keys if memory is low.
  2. if there is no memory available at all in Selector.poll() the select() call will return immediately with read-ready channels that we cannot service (because no memory available). this may cause the SocketServer.Processor.run() call (which calls into poll) to go into a tight loop where no progress can be made. it is my (Radai) opinion that in such a scenario there will very likely be progress to be made elsewhere in run() (for example processCompletedSends() - presumably memory is tight because a lot of requests are executing, some of them are very likely done). to avoid the tight loop code has been written to mute all channels when no memory is available (so the select() call will block for a while waiting for other things) and unmute them if/when memory becomes available in a future call to poll(). this code is available in a separate branch. issues with this implementation:
    1. when memory is unavailable channels will be muted until the next call to poll(). if no other channel activity is present except reads this means a 300ms wait at least (thats the current time a poll() call is set to wait)
    2. every such mute/unmute cycle (in response to memory becoming unavailable/available accordingly) is O(#channels), which seems excessive.
    perhaps a better alternative would be to have reads (as opposed to connects, disconnects and writes) be done by dedicated threads. such threads could actually block waiting for memory (something that currently SocketServer.Processor cannot do as it has more responsibilities beyond reads)

State of Implementation

an implementation is available - https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool

a separate branch implements channel muting/unmuting under memory pressure - https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool-with-muting