You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

Current State: Draft

Discussion Thread: link

JIRA: KAFKA-4011

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka currently supports setting an upper bound on the number of requests allowed into the (incoming) request queue. This is an indirect way of controlling memory consumption but has a few drawbacks:

  1. An administrator needs to estimate the average request size in order to provide a meaningful size limit.
  2. This size limit may need to be periodically updated as the workload changes.
  3. The server is still susceptible to a simultaneous batch of large requests exhausting the JVM memory (causing an OOM exception).

The third scenario actually occurred a few times at LinkedIn - a sudden spike of very large request batches (1000s requests each) from a Hadoop job caused OOM exceptions on a production cluster.

This KIP proposes allowing an administrator to specify a request queue size limit in bytes, which resolves the above problems.

Public Interfaces

This KIP introduces a new server configuration parameter, queued.max.bytes, that would specify a limit on the volume of requests that can be held in the request queue. This configuration parameter can either replace queued.max.requests completely, or co-exist with it (by way of either-or or respecting both bounds and blocking the network threads when either is hit).

Beyond the proposed new configuration key this KIP makes no changes to client or server public APIs.

Proposed Changes

RequestChannel.requestQueue (currently an ArrayBlockingQueue) could be replaced with an instance of ByteBoundedBlockingQueue (which is already in the codebase). If backwards compatibility with queued.max.requests is required, RequestChannel can choose between the current ArrayBlockingQueue and ByteBoundedBlockingQueue at construction time.

Caveat

As ByteBoundedBlockingQueue would allow any size request if it has any capacity available, the actual memory bound is queued.max.bytes + socket.request.max.bytes. The up-side is no issues with large requests getting starved out

Compatibility, Deprecation, and Migration Plan

There are a few approaches w.r.t migration. The current preference is to go with the third option.

  1. queued.max.requests is deprecated/removed in favor of queued.max.bytes. In this case, the conversion of existing configurations could use queued.max.bytes = queued.max.requests * socket.request.max.bytes (which is conservative, but "safe")
  2. queued.max.requests is supported as an alternative to queued.max.bytes (either-or), in which case no migration is required. A default value of 0 could be used to disable the feature (by default) and runtime code would pick a queue implementation depending on which configuration parameter is provided.
  3. queued.max.requests is supported in addition queued.max.bytes (both respected at the same time). In this case a default value of queued.max.bytes = -1 would maintain backwards compatible behavior.

The current naming scheme of queued.max.requests (and the proposed queued.max.bytes) may be a bit opaque. Perhaps using requestQueue.max.requests and requestQueue.max.bytes would more clearly convey the meaning to users (indicating that these settings deal with the request queue specifically, and not some other). The current queued.max.requests configuration can be retained for a few more releases for backwards compatibility.

Test Plan

  • existing unit tests for ByteBoundedBlockingQueue need extension.
  • A unit test that validates correct behavior of RequestChannel under capacity bounds would need to be written.
  • A micro-benchmark for determining the performance of the various queue implementations - https://github.com/radai-rosenblatt/kafka-benchmarks
  • Stress testing a broker (heavy producer load of varying request sizes) to verify that the memory limit is honored.
  • Benchmarking producer and consumer throughput before/after the change to prove that ingress/egress performance remains acceptable.

Rejected Alternatives

Here are some alternatives that we have discussed (at LinkedIn):

  1. Reducing producer max batch size: this is harmful to throughput (and is also more complicated to maintain from an administrator's standpoint than simply sizing the broker itself). This is more of a workaround than a fix
  2. Reducing producer max request size: same issues as above.
  3. Limiting the number of connected clients: same issues as above
  4. Reducing queued.max.requests in the broker: Although this will conservatively size the queue it can be detrimental to throughput in the average case.
  • No labels