Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP proposes allowing an administrator to specify a memory limit in bytes, which resolves the above problems.

Also, the code/facilities introduced by this KIP might prove useful for similar problems on the client side - like producer/consumer memory bounds.

Public Interfaces

This KIP introduces 2 new server configuration parameters:

  1. a new server configuration parameter, queued.max.bytes, that would specify a limit on the volume of requests that can be held in memory. This configuration parameter will co-exist with the existing queued.max.requests (the code will respect both bounds and will not

...

  1. pick up new requests when either is hit).
  2. a new server configuration parameter, memory.pool.class.name, that would allow a user to specify the exact implementation class used to implement the memory bound (similar to how autorizers are configurable under KIP-11). unlike authorizers, this value would default to the SimplePool so normally users would not need to specify this. other pool implementations may be desirable at times for triaging memory issues (for development or stability testing) or for performance tuning.

Beyond the proposed new configuration key keys this KIP makes no changes to client or server public APIs.

Proposed Changes

Memory Pools

a new MemoryPool interface would be introduced into the kafka codebase:

Code Block
languagejava
themeMidnight
titleMemoryPool.java
linenumberstrue
/**
 * A common memory pool interface for non-blocking pools.
 * Every buffer returned from tryAllocate() must always be release()ed. 
 */
public interface MemoryPool {
    /**
     * Tries to acquire a ByteBuffer of the specified size
     * @param sizeBytes size required
     * @return a ByteBuffer (which later needs to be release()ed), or null if no memory available.
     *         the buffer will be of the exact size requested, even if backed by a larger chunk of memory
     */
    ByteBuffer tryAllocate(int sizeBytes);

    /**
     * Returns a previously allocated buffer to the pool.
     * @param previouslyAllocated a buffer previously returned from tryAllocate()
     */
    void release(ByteBuffer previouslyAllocated);
    /**
     * Returns the total size of this pool
     * @return total size, in bytes
     */
    long getSize();
    /**
     * Returns the amount of memory available for allocation by this pool.
     * NOTE: result mey be negative (pools may over allocate to avoid starvation issues)
     * @return
     */
    long getAvailableMemory();
    /**
     * Returns true if the pool cannot currently allocate any more buffers
     * - meaning total outstanding buffers meets or exceeds pool size and
     * some would need to be released before further allocations are possible.
     *
     * This is equivalent to getAvailableMemory() <= 0
     * @return true if out of memory
     */
    boolean isOutOfMemory();
}
  1. the pool is non-blocking, so network threads would not be blocked waiting for memory and could make progress elsewhere.
  2. SocketServer would instantiate and hold a memory pool, which Processor threads would try to allocate memory from when reading requests out of sockets (by passing the pool to instances of NetworkReceive that they create).
  3. NetworkReceive.readFromReadableChannel() would be modified to try allocating memory (it is already written in a way that reading may involve multiple repeated calls to readFromReadableChannel(), so not a big change to behavior)
  4. memory would be released at the end of request processing (in KafkaRequestHandler.run())
  5. to facilitate faster implementation (as a safety net) the pool will be implemented in such a way that memory that was not release()ed (but still garbage collected) would be detected and "reclaimed". this is to prevent "leaks" in case of code paths that fail to release() properly.

Caveats

  1. , and also in case of disconnection mid request-building in KafkaChannel.close()
  2. As the pool would allow any size request if it has any capacity available, the actual memory bound is queued.max.bytes + socket.request.max.bytes. The up-side is no issues with large requests getting starved out

Request throttling by way of channel muting

when memory is unavailable, Selector would mute incoming channels so as not to get into a tight-loop where read-ready keys are returned by poll() but cannot actually be read from (because no memory). care must be taken when muting/unmuting because:

  1. SSL transports cannot be muted mid-handshake (I believe this is more for implementation simplicity in kafka than a fundamental limitation, but not one this KIP should address)
  2. muting a channel that has already been allocated memory to read into might result in a deadlock.
  3. muting channels is also kafka's way of ensuring only a single request is processed at a time from the same channel. this guarantee must be preserved.

so, the following changes are proposed in support of channel muting under memory pressure:

  1. TransportLayer.isInMutableState() would be introduced to account for transports that cannot currently be muted (currently only SSL during handshake)
  2. KafkaChannel.isInMutableState() would be introduced to account for channels with already allocated memory or channels who's underlying transport is not in a mutable state.
  3. Selector.poll() would mute all channels that are in a mutable state if the server has no memory to accept any further requests - this would prevent their keys from being returned by the underlying select() call and thus prevent a tight loop in SocketServer.Processor.run()
  4. when memory becomes available again (in some subsequent Selector.poll() call) any channels previously muted except those muted for single-request-procedding reasons (see #3 above) will be unmuted. this would require maintainign a set of channels explicitely muted (so they would not be unmuted when memory is available) in Selector

Caveats

  1. concerns have been raised about the possibility of starvation in Selector.pollSelectionKeys() - in case the order of keys in Set<SelectionKey> selectionKeys is deterministic and memory is tight, sockets consistently at the beginning of the set get better treatment then those at the end of the iteration order. to overcome this code has been put in place to shuffle the selection keys and handle them in random order ONLY IF MEMORY IS TIGHT (so if the previous allocation call failed). this avoids the overhead of the shuffle when memory is not an issue.

...

queued.max.bytes must be larger than socket.request.max.bytes (in other words, memory pool must be large enough to accommodate the largest single request possible), or negative <=0 (if disabled).

Test Plan

  • A unit test was written to validate the behavior of the memory pool
  • A unit test that validates correct behavior of RequestChannel under capacity bounds would need to be written.
  • A micro-benchmark for determining the performance of the pool would need to be written
  • Stress testing a broker (heavy producer load of varying request sizes) to verify that the memory limit is honored.
  • Benchmarking producer and consumer throughput before/after the change to prove that ingress/egress performance remains acceptable.

...

  1. the order of selection keys returned from a selector.poll call is undefined. in case the actual implementation uses a fixed order (say by increasing handle id?) and under prolonged memory pressure (so never enough memory to service all requests) this may lead to starvation of sockets that are always at the end of the iteration order. to overcome this the code shuffles the selection keys if memory is low.
  2. a strict pool (which adheres to its max size completely) will cause starvation of large requests under memory pressure (as they would never be able to allocate if there is a stream of small requests). to avoid this the pool implementation will allocate the requested amount of memory if it has any memory available (so if pool has 1 free byte and 1 MB is requested, 1MB will be returned and the number of available bytes in the pool will be negative). this means the actual bound on number of bytes outstanding is queued.max.bytes + socket.request.max.bytes - 1(socket.request.max.bytes representing the largest single request possible)if there is no memory available at all in Selector.poll() the select() call will return immediately with read-ready channels that we cannot service (because no memory available). this may cause the SocketServer.Processor.run() call (which calls into poll) to go into a tight loop where no progress can be made. it is my (Radai) opinion that in such a scenario there will very likely be progress to be made elsewhere in run() (for example processCompletedSends() - presumably memory is tight because a lot of requests are executing, some of them are very likely done). to avoid the tight loop code has been written to mute all channels when no memory is available (so the select() call will block for a while waiting for other things) and unmute them if/when memory becomes available in a future call to poll(). this code is available in a separate branch. issues with this implementation:
  3. when memory is unavailable channels will be muted until the next call to poll(). if no other channel activity is present except reads this means a 300ms wait at least (thats the current time a poll() call is set to wait)
  4. every such mute/unmute cycle (in response to memory becoming unavailable/available accordingly) is O(#channels), which seems excessive.
  5. perhaps a better alternative would be to have reads (as opposed to connects, disconnects and writes) be done by dedicated threads. such threads could actually block waiting for memory (something that currently SocketServer.Processor cannot do as it has more responsibilities beyond reads)

State of Implementation

an implementation is available - https://github.com/radai-rosenblatt/kafka/tree/broker-memory-poola separate branch implements channel muting/unmuting under memory pressure - https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool-with-muting

...