Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Accepted

Discussion thread: here

JIRA:

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-67096927

Released: 2.0.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

A new broker configuration will be provided to block older clients from being able to fetch messages from the brokercompletely disable down-conversion on the broker to satisfy FetchRequest from client. This configuration provides an added measure to completely disable any down-conversions on the brokerthe optimizations discussed in this KIP.

  • Topic-level configuration:  minimummessage.consumerdownconversion.fetch.versionenable
  • Broker-level configuration: log.minimummessage.consumerdownconversion.fetch.versionenable
  • Type: StringBoolean
  • Possible Values:  "0.8.0", "0.8.1", "0.8.2", ..., "1.1" (see ApiVersion for an exhaustive list of possible values)
  • Explanation: Sets the minimum FetchRequest API version for consumer fetch requests. Broker sends an InvalidRequestException in response to a FetchRequest from a client running an older version.
  • Default Value: "0.8.0", meaning the broker will honor all FetchRequest versions.

The configuration accepts the broker version rather than the message format version to keep it consistent with other existing configurations like like message.format.version. Even though the configuration accepts all possible broker versions, the configuration takes effect only on certain versions where the message format actually changed. For example, setting minimum.consumer.fetch.version to "0.10.0" (which introduced message format 1) will block all clients requesting message format 0.

Proposed Changes

Message Chunking

We will introduce a message chunking approach to reduce overall memory consumption during down-conversion. The idea is to lazily and incrementally down-convert messages "chunks" in a batched fashion, and send the result out to the consumer as soon as the chunk is ready. The broker continuously down-converts chunks of messages and sends them out, until it has exhausted all messages contained in the FetchResponse, or has reached some predetermined threshold. This way, we use a small, deterministic amount of memory per FetchResponse.

The diagram below shows what a FetchResponse looks like (excluding some metadata that is not relevant for this discussion). The red boxes correspond to the messages that need to be sent out for each partition. When down-conversion is not required, we simply hold an instance of FileRecords for each of these which contains pointers to the section of file we want to send out. When sending out the FetchResponse, we use zero-copy to transfer the actual data from the file to the underlying socket which means we never actually have to copy the data in userspace.

Image Removed

  • True / False
  • Explanation: Controls whether down-conversion of messages is enabled to satisfy client FetchRequest. If true, broker will down-convert messages when required. If set to false, broker will disable down-conversion of messages and will send UnsupportedVersionException in response to any client FetchRequest that requires down-conversion.
  • Default Value: True

Proposed Changes

Message Chunking

We will introduce a message chunking approach to reduce overall memory consumption during down-conversion. The idea is to lazily and incrementally down-convert message "chunks" in a batched fashion, and send the result out to the consumer as soon as the chunk is ready. The broker continuously down-converts chunks of messages and sends them out, until it has exhausted all messages contained in the FetchResponse, or has reached some predetermined threshold. This way, we use a small, deterministic amount of memory per FetchResponse.

The diagram below shows what a FetchResponse looks like (excluding some metadata that is not relevant for this discussion). The red boxes correspond to the messages that need to be sent out for each partition. When down-conversion is not required, we simply hold an instance of FileRecords for each of these which contains pointers to the section of file we want to send out. When sending out the FetchResponse, we use zero-copy to transfer the actual data from the file to the underlying socket which means we never actually have to copy the data in userspace.

Image Added

When down-conversion is required, each of the red boxes correspond to a MemoryRecords. Messages for each partition are read into JVM heap, converted to the appropriate format, and we hold on to this When down-conversion is required, each of the red boxes correspond to a MemoryRecords. Messages for each partition are read into JVM heap, converted to the appropriate format, and we hold on to this memory containing converted messages till the entire FetchResponse is created and subsequently sent out. There are couple of inefficiencies with this approach:

...

  1. Read a set of message batches into memory.
  2. Down-convert all message messages batches that were read.
  3. Write the result buffer to the underlying socket.
  4. Repeat till we have sent out all the messages, or have reached a pre-determined threshold.

...

To resolve this, we will use the pre-downconversion size (Spre) as an estimate for the an estimate Spre (defined below) as opposed to the actual post-downconversion size (Spost) for each topic-partition. We also commit to send out exactly Spre bytes for that particular partition. In terms of the above equation, the size will be computed as:

Code Block
Size = HeaderSize + MetadataSize + ∑(Spre)
 
where Spre = max(size_pre_downconversion, size_first_batch_after_downconversion)

Given this, we have three possible scenarios:

...

To ensure that consumer keeps making progress, Kafka makes sure that every response consists of at least one message batch for the first partition, even if it exceeds fetch.max.bytes and max.partition.fetch.bytes (KIP-74). When Spre < Spost (second case above), we cannot send out all the messages and need to trim message batch(es) towards the end. It is possible that we end up in a situation with a single partial batch in this case.

To prevent this from happening, we will not delay down-conversion of the first partition in the response. We will down-convert all messages of the first partition in the I/O thread (like we do today), and only delay down-conversion for subsequent partitions. This ensures that know the exact size of down-converted messages in the first partition beforehand, and can fully accomodate it in the response.

This can be done by accepting an additional parameter doLazyConversion in KafkaApis#handleFetchRequest#convertedPartitionData which returns FetchResponse.PartitionData containing either LazyDownConvertedRecords or MemoryRecords depending on whether we want to delay down-conversion or not.

Determining number of batches to down-convert

A limit will be placed on the size of messages down-converted at a given point in time (what forms a "chunk"). A chunk is formed by reading a maximum of 16kB of messages. We only add full message batches to the chunk. Note that we might have to exceed the 16kB limit if the first batch we are trying to read is larger than that.

 

Pros

  • Fixed and deterministic memory usage for each down-conversion response.
  • Significantly reduced duration for which down-converted messages are kept referenced in JVM heap.
  • No change required to existing clients.

Cons

Because Spre is at least as big as the size of first batch after down-conversion, we are guaranteed that we send out at least that one batch.

Determining number of batches to down-convert

A limit will be placed on the size of messages down-converted at a given point in time (what forms a "chunk"). A chunk is formed by reading a maximum of 16kB of messages. We only add full message batches to the chunk. Note that we might have to exceed the 16kB limit if the first batch we are trying to read is larger than that.

 

Pros

  • Fixed and deterministic memory usage for each down-conversion response.
  • Significantly reduced duration for which down-converted messages are kept referenced in JVM heap.
  • No change required to existing clients.

Cons

  • More computation being done in network threads.
  • Additional file I/O being done in network threads.
  • Complexity - expect this to be a big patch, but should be able to keep it abstracted into its own module so that it does not cause too much churn in existing code.
  • Could be viewed as being somewhat "hacky" because
  • More computation being done in network threads.
  • Additional file I/O being done in network threads.
  • Complexity - expect this to be a big patch, but should be able to keep it abstracted into its own module so that it does not cause too much churn in existing code.
  • Could be viewed as being somewhat "hacky" because of the need the need to pad "fake" messages at the end of the response.

...

Even though the chunking approach tries to minimize the impact on memory consumption when down-conversions need to be performed, the reality is that we cannot completely eliminate its impact on CPU and memory consumption. Some users might want an ability to completely block older clients, such that the broker is not burdened with having to perform down-conversions. We will add a broker-side configuration parameter to help specify the minimum compatible consumer that can fetch messages from a particular topic (see the "Public Interfaces" section for details).

Compatibility, Deprecation, and Migration Plan

section for details).

Compatibility, Deprecation, and Migration Plan

There should be no visible impact after enabling the message chunking behavior described above. One thing that clients need to be careful about is the case where the total response size is greater than the size of the actual payload (described in Spre > Spost scenario of the chunking approach); client must ignore any message who size exceeds the size of the total response.

Testing Strategy

There are two main things we want to validate with the changes brought in by this KIP.

  1. Efficiency of the chunking approach
    1. Did we improve overall memory usage during down-conversion?
    2. Estimate of how much memory is consumed for each down-conversion the broker handles.
    3. How many concurrent down-conversions can the broker support?
  2. Effect on consumer throughput for various chunk size

The following describes the test setup for each of the points above and the corresponding findings.

Efficiency of Chunking Approach

The aim of the test was to prove that we have a finite bound on the amount of memory we consumer during down-conversion, regardless of the fetch size. The test setup is described in detail below.

Test Setup:
- Java heap size = 200MB
- 1M messages, 1kB each ==> 1GB of total messages
- Split into 250 partitions ==> approximately 3.5MB per partition
- Single consumer with `fetch.max.bytes` = 250MB and `max.partition.fetch.bytes` = 1MB
- Each fetch consumes min(1MB*250, 250MB) = 250MB

Success criteria:
- Must always run out of memory if not using lazy down-conversion
- Must never run out of memory if using lazy down-conversion

Findings:
- Without the chunking approach, we down-convert all messages for all partitions, resulting in us running out of heap space.
- With the chunking approach, down-conversion consumes maximum of the chunked size at a time (which was set to 128kB). This keeps the memory usage both deterministic and finite, regardless of the fetch size.

Effect on consumer throughput

The aim of this test was to study the effect on throughput as we vary the chunk size, and to find the optimal chunking size.

Test Setup:
- 1 topic
- 12 partitions
- 10M messages, 1kB each
- 1 consumer
- Consume messages from start to end 10 times

The following table outlines the findings:

Chunk SizeAverage Throughput (MBPS)
16kB136.95
32kB167.04

64kB

181.92
128kB197.72
256kB181.25
512kB180.41
1MB176.64

The average throughput without the chunking approach (i.e. without this KIP) was found to be 178.9MBPS. Given this, the default chunk size will be configured to 128kBThere should be no visible impact after enabling the message chunking behavior described above. One thing that clients need to be careful about is the case where the total response size is greater than the size of the actual payload (described in Spre > Spost scenario of the chunking approach); client must ignore any message who size exceeds the size of the total response.

Rejected Alternatives

We considered several alternative design strategies but they were deemed to be either not being able to address the root cause by themselves, or being too complex to implement.

...