Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We will introduce a message chunking approach to reduce overall memory consumption during down-conversion. The idea is to lazily and incrementally down-convert messages incrementally in "chunks", and send the result out to the consumer as soon as the chunk is ready. The broker continuously down-converts chunks of messages and sends them out, until it has exhausted all messages contained in the FetchResponse, or has reached some predetermined threshold. This way, we use a small, fixed amount of memory per FetchResponse.

...

Although we have increased some amount of computation and I/O for reading log segments in the network thread, this is not expected to be very significant to cause any performance degradation. We will continue performing non-blocking socket writes in the network thread, to avoid having to stall the network thread waiting for socket I/O to complete.in the network thread, to avoid having to stall the network thread waiting for socket I/O to complete.

Messages that require lazy down-conversion are encapsulated in a new class called LazyDownConvertedRecords. LazyDownConvertedRecords#writeTo will provide implementation for down-converting messages in chunks, and writing them to the underlying socket.

Code Block
public class LazyDownConvertRecords implements Records {
    /**
     * Reference to records that will be "lazily" down-converted. Typically FileRecords.
     */
    private final Records records;

    /**
     * Magic value to which above records need to be converted to.
     */
    private final byte toMagic;

    /**
     * The starting offset of records.
     */
    private final long firstOffset;

    /**
     * A "batch" of converted records.
     */
    private ConvertedRecords<? extends Records> convertedRecords = null;

    // Implementation for all methods in Records interface
}

Determining Total Response Size

...

This can be done by accepting an additional parameter doLazyConversion in KafkaApis#handleFetchRequest#convertedPartitionData which returns FetchResponse.PartitionData containing either LazyDownConvertedRecords or MemoryRecords depending on whether we want to delay down-conversion or not.

 

Sizing Intermediate and Result Buffers

...

  • Fixed and deterministic memory usage for each down-conversion response.
  • Significantly reduced duration for which down-converted messages are kept referenced in JVM heap.
  • No change required to existing clients.

Cons

  • More computation being done in network threads.
  • Additional file I/O being done in network threads.
  • Complexity - expect this to be a big patch, but should be able to keep it abstracted into its own module so that it does not cause too much churn in existing code.
  • Could be viewed as being somewhat "hacky" because of the need the need to pad "fake" messages at the end of the response.

...