Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Public Interfaces

This KIP introduces 2 new server configuration parameters: a new server configuration parameter, queued.max.bytes, that would specify a limit on the volume of requests that can be held in memory. This configuration parameter will co-exist with the existing queued.max.requests (the code will respect both bounds and will not pick up new requests when either is hit).

...

Beyond the proposed new configuration keys key this KIP makes no changes to client or server public APIs.

...

  1. SSL transports cannot be muted mid-handshake (I believe this is more for implementation simplicity in kafka than a fundamental limitation, but not one this KIP should address)
  2. muting a channel that has already been allocated memory to read into might result in a deadlock.
  3. muting channels is also kafka's way of ensuring only a single request is processed at a time from the same channel. this guarantee must be preserved.
  4. SSLTransportLayer has intermediate buffers where data may get "stuck" (due to no memory) and yet the underlying socket may be done and so will not show up on further select() calls.

so, the following changes are proposed in support of channel muting under memory pressure:

  1. TransportLayer.isInMutableState() would be introduced to account for transports that cannot currently be muted (currently only SSL during handshake)
  2. TransportLayer.hasBytesBuffered() would be introduced to account for transports that have unread data in any intermediate buffers (currently only possible for ssl transport)
  3. KafkaChannel.isInMutableState() would be introduced to account for channels with already allocated memory or channels who's underlying transport is not in a mutable state.
  4. Selector.poll() would mute all channels that are in a mutable state if the server has no memory to accept any further requests - this would prevent their keys from being returned by the underlying select() call and thus prevent a tight loop in SocketServer.Processor.run()
  5. when memory becomes available again (in some subsequent Selector.poll() call) any channels previously muted except those muted for single-request-procedding reasons (see #3 above) will be unmuted. this would require maintainign a set of channels explicitely muted (so they would not be unmuted when memory is available) in Selector
  6. channels who's underlying transports have yet-unread data buffered must be accounted for to prevent the case of a stale tail of data getting stuck in said buffers.

Caveats

  1. concerns have been raised about the possibility of starvation in Selector.pollSelectionKeys() - in case the order of keys in Set<SelectionKey> selectionKeys is deterministic and memory is tight, sockets consistently at the beginning of the set get better treatment then those at the end of the iteration order. to overcome this code has been put in place to shuffle the selection keys and handle them in random order ONLY IF MEMORY IS TIGHT (so if the previous allocation call failed). this avoids the overhead of the shuffle when memory is not an issue.

...

queued.max.bytes must be larger than socket.request.max.bytes (in other words, memory pool must be large enough to accommodate the largest single request possible), or <=0 (if disabled). the default would be -1.

Test Plan

  • A unit test was written to validate the behavior of the memory pool
  • A unit test that validates correct behavior of RequestChannel under capacity bounds would need to be written.
  • A micro-benchmark for determining the performance of the pool would need to be written
  • Stress testing a broker (heavy producer load of varying request sizes) to verify that the memory limit is honored.
  • Benchmarking producer and consumer throughput before/after the change to prove that ingress/egress performance remains acceptable.

...