Status

Current state"Under Discussion"

Discussion thread: here 

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In Kafka MirrorMaker v1 (the version we used in house), the mirror process starts with N consumers and one producer.  The consumer reads the message from a given topic from the source cluster and sends the data through the producer to write to the destination cluster.  There are multiple bytes copying steps along the way:

When the message is compressed, the consumer needs to decompress the message and the producer will need to recompress the message, the mirror maker itself does not use the decompressed message.  The decompression/recompression step is very costly in the data transfer pipeline and causes extra delay and CPU cost (as high as 40% cpu overhead) and becomes a bottleneck in scaling the mirror maker.  The decompression process also uses extensive memory on the fly and we observed up to 2x-10x memory explosion during decompression for some GZIP compressed topic streams.  If the partition fetch bytes parameter is not configured properly, the mirror process will trigger OOM when the traffic spikes.

Kafka 0.7 used to have a shallow iterator feature to skip full message decompression, however this feature was disabled in Kafka 0.8  (https://issues.apache.org/jira/browse/KAFKA-732).

The KIP proposes a shallow mirror feature which brings back the shallow iterator concept to the mirror process and also proposes to skip the unnecessary message decompression and recompression steps.  We argue in many cases users just want a simple replication pipeline to replicate the message as it is from the source cluster to the destination cluster.  In many cases the messages in the source cluster are already compressed and properly batched, users just need an identical copy of the message bytes through the mirroring without any transformation or repartitioning.

We have a prototype implementation in house with MirrorMaker v1 and observed CPU usage dropped from 50% to 15% for some mirror pipelines.

Proposed change is using MirrorMaker v1 as the reference implementation since that's the product we are using in house.  The code change mostly occurs in consumer and producer code paths with some glue code change in mirror maker, it can be adapted to mirror maker v2 by the community as well.

About the name

We name this feature: shallow mirroring since it has some resemblance to the old Kafka 0.7 namesake feature but the implementations are not quite the same.  ‘Shallow’ means 1. we shallowly iterate RecordBatches inside MemoryRecords structure instead of deep iterating records inside RecordBatch; 2. We shallowly copy (share) pointers inside ByteBuffer instead of deep copying and deserializing bytes into objects.  Initially we used another project name: identity mirroring to emphasize the fact that the messages mirrored from the source to the destination cluster are identical (same compression, same batch composition).  However it seems Cloudera has used the term identity mirror already when introducing Mirror Maker V2 (https://blog.cloudera.com/a-look-inside-kafka-mirrormaker-2/)

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

ConsumerConfig

A new ConsumerConfig boolean option: fetch.raw.bytes is added to skip message de-serialization and decompression.  When this option is enabled, the consumer will return the underlying ByteBuffer directly as the value field of ConsumerRecord to the caller.  The ByteBuffer represents the batch of messages (RecordBatch) fetched from the source broker.

ConsumerRecord

When fetch.raw.bytes is enabled through ConsumerConfig, the value field of ConsumerRecord is the ByteBuffer representing the batch of messages fetched from the source broker.  Most of the other fields in ConsumerRecord are null if it doesn’t make sense for the whole batch (e.g. key, header fields).  The offset field is set to the last message offset of the batch.

For the caller, it will see consumer.poll() return a ConsumerRecord<Void,ByteBuffer>

ProducerConfig

A new ProducerConfig boolean option: send.raw.bytes is added to skip message re-serialization and re-compression.  When this option is enabled, the caller will pass the ByteBuffer directly through the value portion of ProducerRecord to the producer.  The ByteBuffer represents the batch of messages.

Fetch.raw.bytes and send.raw.bytes options need to be turned on together.

ProducerRecord

When send.raw.bytes is enabled through ProducerConfig, the value field of ProducerRecord is the ByteBuffer representing the batch of messages to be forwarded together to the target broker.  Most of the other fields in ProducerRecord are set to null if it doesn’t make sense for the whole batch (e.g. key, header fields).

For the caller, it will call producer.send with ProducerRecord<Void,ByteBuffer>

MirrorMaker (v1) command line option

A new command line option: use.raw.bytes is added.  When this option is enabled, the mirror maker will set the fetch.raw.bytes option for the consumer and send.raw.bytes option on the producer

MirrorMakerMessageHandler (v1)

The original MirrorMaker’s message handler plugin: MirrorMakerMessageHandler is hard coded to handle the message of type [Array[Byte],Array[Byte]].  We added a new MirrorMakerRawBytesMessageHandler to handle the message of type [Void,ByteBuffer]

ProduceRequest

Up to ProduceRequest V2, a ProduceRequest can contain multiple batches of messages stored in the record_set field, but this was disabled in V3.  We are proposing to bring the multiple batches feature back to improve the network throughput of the mirror maker producer when the original batch size from source broker is too small.

The following ProduceRequest/Response V8 are essentially the same as the ProduceRequest/Response V7 (current version) schema wise, but the record_set field can contain multiple batches.

ProduceRequest (Version: 8) => transactional_id acks timeout topic_data
    transactional_id => STRING
    acks => INT16
    timeout => INT32
    topic_data => [topic data]
        topic => STRING
        data => [partition_id record_set]
            partition_id => INT32
            record_set => DocumentType of MemoryRecords

ProduceResponse (Version: 8) => responses throttle_time_ms
    responses => [topic partition_responses]
        topic => STRING
        responses => [partition_id error_code base_offset log_append_time]
            partition_id => INT32
            error_code = INT16
            base_offset => INT64
            log_append_time => INT64
    throttle_time_ms => INT32


Proposed Changes

In our mirror pipelines, the source and destination cluster have the same set of topic/partitions and same configurations.  Most of the messages on the source cluster are already compressed.  We want the mirror maker to work as an efficient pass-through pipe to carry the bytes as it is from the source cluster to the destination cluster without any re-processing and we want the ProduceRequest message going to the destination broker almost the same as the FetchResponse message read from the source broker.

Raw Bytes Mode

In order to make this a streamlined pipe, we want the mirror to transfer raw bytes rather than the de-serialized/de-compressed consumer record.  In a FetchResponse, each topic-partition contains a MemoryRecords ByteBuffer which is composed of a series of RecordBatch.  Each RecordBatch would contain a batch of records.  Decompression would be triggered when RecordBatch.streamingIterator() is called.  So instead of iterating through and decompressing (and later de-serializing) each record, we would just return the ByteBuffer representing RecordBatch itself as a ‘record’ back to the fetcher and later back to the consumer.  So when Consumer.poll() is called, it will get back one ConsumerRecord<Void,ByteBuffer> where the ByteBuffer represents the bytes corresponding to RecordBatch.  Note that the record key is essentially null in this case since we are dealing with the batch as a unit now.

Later the mirror maker would pass this ByteBuffer into ProducerRecord<Void,ByteBuffer> and call Producer.send.  Inside producer code path, Producer.send() → RecordAccumulator.append() → ProducerBatch.tryAppend() → MemoryRecordsBuilder.append().  In MemoryRecordsBuildler.append, we will append the bytes from passed-in ByteBuffer directly into the buffer allocated from RecordAccumulator’s BufferPool.  The remaining code path remains intact: MemoryRecordsBuilder.build() will be called to return a MemoryRecords which contains the byte buffer and be passed on to create the producer's ProduceRequest message.

Changes in consumer config and ConsumerRecord

A new ConsumerConfig option: fetch.raw.bytes is added to skip message de-serialization and decompression.  

When fetch.raw.bytes is enabled through ConsumerConfig, the value field of ConsumerRecord is the ByteBuffer representing the batch of messages fetched from the source broker.  Most of the other fields in ConsumerRecord are null if it doesn’t make sense for the whole batch (e.g. key, header fields).  The offset field is set to the last message offset of the batch.

For the caller, it will see consumer.poll() returns a ConsumerRecord<Void,ByteBuffer>

Changes in consumer code path

When the client makes a call: Consumer.poll(), the following code path will be invoked: KafkaConsumer.poll() → KafkaConsumer.pollForFetches() → Fetcher.fetchRecords() → Fetcher.nextFetchedRecord() → RecordBatch.streamingIterator()

Rather than explode RecordBatch and decompress/iterate through all records, we would just return a RawBytesBatchRecord (a new class implementing Record interface, containing the byte buffer corresponding to this batch).

This special Record class would trigger the return code path to skip deserialization and return a ConsumerRecord<Void,ByteBuffer>

Dealing with the case when consumer fetches from the middle of the batch

Each RecordBatch inside FetchResponse’s record_set field 1:1 corresponds to the original batches when they were stored inside the source broker’s disk system.  When the consumer first fetches in raw.bytes mode, the first offset it was fetching might be in the middle of a batch.  In this case, we would need to filter out the records whose offset are smaller than the consumer’s starting offset.  We would create a new byte buffer which only contains records whose offset are in the fetch range and return this smaller byte buffer instead.  This would only happen for the first fetch from mirror maker (and mirror maker would commit offsets based on batch’s boundary afterwards), so the performance impact is minimal.

Changes in producer config and ProducerRecord

A new ProducerConfig option: send.raw.bytes is added to skip message re-serialization and re-compression. 

When send.raw.bytes is enabled through ProducerConfig, the value field of ProducerRecord is the ByteBuffer representing the batch of messages to be forwarded together to the target broker.  For the caller, it will call producer.send with ProducerRecord<Void,ByteBuffer>

Changes in producer code path

The client will call Producer.send(ProducerRecord<Void,ByteBuffer>) which invokes the following code path: KafkaProducer.send() → KafkaProducer.doSend() → RecordAccumulator.append() → ProducerBatch.tryAppend() → MemoryRecordsBuilder.append()

The existing code at this point will append each new incoming record to the builder and later MemoryRecordsBuilder.build() will be called to calculate and set the batch header fields and generate the MemoryRecords byte buffer.  In the new code, when the incoming data is already a ByteBuffer, we would skip all the append and build phases to directly copy the bytes from the incoming ByteBuffer into the buffer in the builder.

Changes in mirror maker v1

A new command line option: use.raw.bytes is added.  When this option is enabled, the mirror maker will set the fetch.raw.bytes option for the consumer and send.raw.bytes option on the producer.

When use.raw.bytes mode is on, the mirror is reading/transferring a batch a time and it will also commit offset based on batch boundary.  The ConsumerRecord’s offset field in the batch case represents the offset of the last record in that batch.

The original MirrorMaker’s message handler plugin: MirrorMakerMessageHandler is hard coded to handle the message of type [Array[Byte],Array[Byte]].  We added a new MirrorMakerRawBytesMessageHandler to handle the message of type [Void,ByteBuffer]

Multiple Batches in ProduceRequest

In the first iteration of this project, we just set one RecordBatch into RecordAccumulator’s buffer and the resulting ProduceRequest message only contains one batch.  We noticed the resulting networking performance was not great especially when the original batch is small.  The FetchResponse message from source broker node often have byte buffer in the size of MB since it can contains multiple batches but the outgoing ProduceRequest message going into destination broker often has size of KB, the smaller outbound message caused many more round trips between mirror maker node and destination broker node and sequential processing nature of those ProduceRequest messages on broker side drags down the performance of the mirror pipe.

Actually there is no reason why the ProduceRequest message can only contain one batch.  ProduceRequest message up to V2 can contain multiple batches.  In ProduceRequest message format V3, the multiple batching is disabled but this creates the disparity between ProduceRequest message and FetchResponse message, the latter still supports multiple batches.  And if we look at the key data structure in producer and consumer code path: MemoryRecords, a MemoryRecords object contains a series of batches.

We decide to give multiple batches a try and it doesn’t seem too hard to support that mode.  On the client side, in the producer’s code path: MemoryRecordsBuilder.append() we allow continuously appending the byte buffer from subsequent RecordBatch, the resulting MemoryRecords contains multiple batches.  On the broker side, we removed the check which disallows multiple batches in ProduceRequest.

Handling multiple batches and skip message conversion in the broker

Most of the code changes in this KIP occurs on client side, but we also make a few small changes on broker side to support multiple batches for ProduceRequest and skip message conversions:

  • In ProduceRequest.validateRecords(), remove the check for multiple batches for V3 message
  • LogValidator.validateMessagesAndAssignOffsetsCompressed(), we would loop through the batches from MemoryRecords (instead of assuming only one batch)

Compatibility, Deprecation, and Migration Plan

  • The new mirror behavior is only invoked when fetch.raw.bytes and send.raw.bytes options are turned on.  The existing user code should be able to work as it is.
  • If the user has their own implementation of MirrorMaker v1’s MirrorMakerMessgeHandler, they would need to provide implementation for the new MirrorMakerRawBytesMessageHandler;  If the user is using the default MirrorMaker MessageHandler, no code change is needed
  • ProduceRequest message now allows for multiple batches in record_set field

Rejected Alternatives


  • One alternative is the current mirror implementation where the message is de-serialized, de-compressed and then re-serialized and re-compressed.  Although it is inefficient on the surface, the current implementation has its own merit.  It helps in the case when the original messages on the source broker is not compressed or has a very small number of messages in a batch, the mirroring brings in the second chance of compressing the message or creating a bigger batch of messages.  And the current implementation also works better if the user has their own message handler to further process the message since they are dealing with a full deserialized ConsumerRecord object (instead of raw ByteBuffer).  The current implementation would be more appropriate in those cases and the user can choose to use the old mirror behavior by not setting use.raw.bytes config (which is off  by default).
  • Another alternative is just to skip the mirror maker, write a new low-level component which reads the FetchResponse message directly from the source broker, break the message per topic-partition and re-group the messages into ProduceRequest per target broker’s topic-partition grouping.  This is going to be more efficient and will have less incision into the existing code path.  However we would have to add code to deal with workload balancing and rebalancing which is currently handled by the high level consumer group.

Future Work

The prototype was implemented using and tested with MirrorMaker v1 since we are still on Kafka v2.3 where MirrorMaker v2 is not included and we are using MirrorMaker v1 for our production mirroring pipelines.  Porting this solution to MirrorMaker v2 should be straightforward since most of the code changes are in the code path of Consumer, Producer and Broker.

Performance Chart

Here are some performance charts to show the CPU and memory usage before and after we deploy the shallow mirroring feature.  The switch to shallow mirroring occurred shortly before 1/15.

  • CPU difference before and after the feature implementation


  • Memory Consumption Related to incoming messages before and after


  • GC collection time (young generation) before and after

















  • No labels