Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka typically uses zero-copy optimization for transferring data from the file system cache to the network socket when sending messages from brokers to consumers. This optimization works only when the consumer is able understands the on-disk message format.

...

  1. Provide appropriate configuration parameters to manage maximum memory usage during down-conversion on the broker.
  2. Reduce overall footprint of the down-conversion process, both in terms of memory usage and the time for which the required memory is held.
  3. As we cap the memory usage, we’d inadvertently affect the throughput and latency characteristics. Lesser the amount of memory available for down-conversion, fewer messages can batched together in a single reply to the consumer. As fewer messages can be batched together, the consumer would require more round-trips to consume the same number of messages. We need a design that minimizes any impact on throughput and latency characteristics.

Public Interfaces

Broker Configuration

A new broker configuration will be provided to block older clients from being able to fetch messages from the broker. This configuration provides an added measure to completely disable any down-conversions on the broker.

...

The configuration accepts the broker version rather than the message format version to keep it consistent with other existing configurations like like message.format.version. Even though the configuration accepts all possible broker versions, the configuration takes effect only on certain versions where the message format actually changed. For example, setting minimum.consumer.fetch.version to "0.10.0" (which introduced message format 1) will block all clients requesting message format 0.

Proposed Changes

Message Chunking

We will introduce a message chunking approach to reduce overall memory consumption during down-conversion. The idea is to down-convert messages incrementally in "chunks", and send the result out to the consumer as soon as the chunk is ready. The broker continuously down-converts chunks of messages and sends them out, until it has exhausted all messages contained in the FetchResponse, or has reached some predetermined threshold. This way, we use a small, fixed amount of memory per FetchResponse.

...

  • More computation being done in network threads.
  • Complexity - expect this to be a big patch, but should be able to keep it abstracted into its own module so that it does not cause too much churn in existing code.
  • Could be viewed as being somewhat "hacky" because of the need the need to pad "fake" messages at the end of the response.

Ability to Block Older Clients

Even though the chunking approach tries to minimize the impact on memory consumption when down-conversions need to be performed, the reality is that we cannot completely eliminate its impact on CPU and memory consumption. Some users might want an ability to completely block older clients, such that the broker is not burdened with having to perform down-conversions. We will add a broker-side configuration parameter to help specify the minimum compatible consumer that can fetch messages from a particular topic (see the "Public Interfaces" section for details).

Compatibility, Deprecation, and Migration Plan

There should be no visible impact after enabling the message chunking behavior described above. One thing that clients need to be careful about is the case where the total response size is greater than the size of the actual payload (described in Spre > Spost scenario of the chunking approach); client must ignore any message who size exceeds the size of the total response.

Rejected Alternatives

We considered several alternative design strategies but they were deemed to be either not being able to address the root cause by themselves, or being too complex to implement.

Alternate Chunked Approach

The "hacky"ness of the chunked approach discussed previously can be eliminated if we allowed the broker to break down a single logical FetchResponse into multiple FetchResponse messages. This means if we end up under-estimating the size of down-converted messages, we could send out two (or more) FetchResponse to the consumer. But we would require additional metadata so that the consumer knows that the multiple FetchResponse actually constitute a single logically continuous response. Because tagging the additional metadata requires changes to FetchResponse protocol, this will however not work for existing consumers.

We could consider this option as the eventual end-to-end solution for the down-conversion issue for future clients that require down-conversion.

Native Heap Memory

Kafka brokers are typically deployed with a much smaller JVM heap size compared to the amount of memory available, as large portions of memory need to be dedicated for the file system cache.

...

  • Need to devise effective way to reuse allocated native heap space (memory pooling).
  • Tricky to design for cases where required memory is not immediately available.
    • Could lead to consumer starvation if we wait too long for memory to become available, or if we never get the required amount of memory.
  • Same amount of memory still consumed in aggregate, just not from the JVM heap. Memory will also not be freed till GC calls finalize.
  • We need some estimate of how much memory to allocate before actually performing the down-conversion.

Configuration for Maximum Down-Conversion Memory Usage

Expose a broker-side configuration message.max.bytes.downconvert, which is the maximum amount of memory the broker can allocate to hold down-converted messages. This means that a message fetch that requires down-conversion would be able to fetch maximum of message.max.bytes.downconvert in a single fetch. If required memory is not available because it is already being consumed by other down-converted messages, we block the down-conversion process till sufficient memory becomes available.

...

  • Determining what message.max.bytes.downconvert should be set to could be a challenge, as setting it too low could affect latency, while setting it too high could cause memory pressure for other modules that share the JVM heap.
  • Another knob for administrators to worry about!
  • Need to devise effective way to reuse allocated native heap space (memory pooling).
  • Tricky to design for cases where required memory is not immediately available.

Compression

To reduce the overall amount of memory consumed by down-converted messages, we could consider compressing them (if they are not already compressed to begin with).

...