Table of Contents |
---|
Status
Current state: Under Discussion Accepted
Discussion thread: here
JIRA:
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Released: 2.0.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
We will introduce a message chunking approach to reduce overall memory consumption during down-conversion. The idea is to lazily and incrementally down-convert messages message "chunks" in a batched fashion, and send the result out to the consumer as soon as the chunk is ready. The broker continuously down-converts chunks of messages and sends them out, until it has exhausted all messages contained in the FetchResponse
, or has reached some predetermined threshold. This way, we use a small, deterministic amount of memory per FetchResponse
.
...
There should be no visible impact after enabling the message chunking behavior described above. One thing that clients need to be careful about is the case where the total response size is greater than the size of the actual payload (described in Spre > Spost scenario of the chunking approach); client must ignore any message who size exceeds the size of the total response.
Testing Strategy
There are two main things we want to validate with the changes brought in by this KIP.
- Efficiency of the chunking approach
- Did we improve overall memory usage during down-conversion?
- Estimate of how much memory is consumed for each down-conversion the broker handles.
- How many concurrent down-conversions can the broker support?
- Effect on consumer throughput for various chunk size
The following describes the test setup for each of the points above and the corresponding findings.
Efficiency of Chunking Approach
The aim of the test was to prove that we have a finite bound on the amount of memory we consumer during down-conversion, regardless of the fetch size. The test setup is described in detail below.
Test Setup:
- Java heap size = 200MB
- 1M messages, 1kB each ==> 1GB of total messages
- Split into 250 partitions ==> approximately 3.5MB per partition
- Single consumer with `fetch.max.bytes` = 250MB and `max.partition.fetch.bytes` = 1MB
- Each fetch consumes min(1MB*250, 250MB) = 250MB
Success criteria:
- Must always run out of memory if not using lazy down-conversion
- Must never run out of memory if using lazy down-conversion
Findings:
- Without the chunking approach, we down-convert all messages for all partitions, resulting in us running out of heap space.
- With the chunking approach, down-conversion consumes maximum of the chunked size at a time (which was set to 128kB). This keeps the memory usage both deterministic and finite, regardless of the fetch size.
Effect on consumer throughput
The aim of this test was to study the effect on throughput as we vary the chunk size, and to find the optimal chunking size.
Test Setup:
- 1 topic
- 12 partitions
- 10M messages, 1kB each
- 1 consumer
- Consume messages from start to end 10 times
The following table outlines the findings:
Chunk Size | Average Throughput (MBPS) |
---|---|
16kB | 136.95 |
32kB | 167.04 |
64kB | 181.92 |
128kB | 197.72 |
256kB | 181.25 |
512kB | 180.41 |
1MB | 176.64 |
The average throughput without the chunking approach (i.e. without this KIP) was found to be 178.9MBPS. Given this, the default chunk size will be configured to 128kB.
Rejected Alternatives
We considered several alternative design strategies but they were deemed to be either not being able to address the root cause by themselves, or being too complex to implement.
...