...
We will introduce a message chunking approach to reduce overall memory consumption during down-conversion. The idea is to lazily and incrementally down-convert messages incrementally in "chunks", and send the result out to the consumer as soon as the chunk is ready. The broker continuously down-converts chunks of messages and sends them out, until it has exhausted all messages contained in the FetchResponse
, or has reached some predetermined threshold. This way, we use a small, fixed amount of memory per FetchResponse
.
...
Although we have increased some amount of computation and I/O for reading log segments in the network thread, this is not expected to be very significant to cause any performance degradation. We will continue performing non-blocking socket writes in the network thread, to avoid having to stall the network thread waiting for socket I/O to complete.in the network thread, to avoid having to stall the network thread waiting for socket I/O to complete.
Messages that require lazy down-conversion are encapsulated in a new class called LazyDownConvertedRecords. LazyDownConvertedRecords#writeTo
will provide implementation for down-converting messages in chunks, and writing them to the underlying socket.
Code Block |
---|
public class LazyDownConvertRecords implements Records {
/**
* Reference to records that will be "lazily" down-converted. Typically FileRecords.
*/
private final Records records;
/**
* Magic value to which above records need to be converted to.
*/
private final byte toMagic;
/**
* The starting offset of records.
*/
private final long firstOffset;
/**
* A "batch" of converted records.
*/
private ConvertedRecords<? extends Records> convertedRecords = null;
// Implementation for all methods in Records interface
} |
Determining Total Response Size
...
This can be done by accepting an additional parameter doLazyConversion
in KafkaApis#handleFetchRequest#convertedPartitionData which returns FetchResponse.PartitionData
containing either LazyDownConvertedRecords
or MemoryRecords
depending on whether we want to delay down-conversion or not.
Sizing Intermediate and Result Buffers
...
- Fixed and deterministic memory usage for each down-conversion response.
- Significantly reduced duration for which down-converted messages are kept referenced in JVM heap.
- No change required to existing clients.
Cons
- More computation being done in network threads.
- Additional file I/O being done in network threads.
- Complexity - expect this to be a big patch, but should be able to keep it abstracted into its own module so that it does not cause too much churn in existing code.
- Could be viewed as being somewhat "hacky" because of the need the need to pad "fake" messages at the end of the response.
...