Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  Under Discussion Accepted

Discussion thread: here

JIRA: KAFKA-3995

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka has a strict message size limit on the messages. This size limit is applied to the compressed messages as well.

...

  1. Change the way to estimate the compression ratio
  2. Split the oversized batch and resend the split batches. 

Public Interfaces

This KIP introduces the following new metric batch-split-rate to the producer. The metric records rate of the batch split occurrence.

Although there is no other public API change, due to the behavior change, users may want to reconsider batch size setting to improve performance. 

Proposed Changes

Decompress the batch which encounters a RecordTooLargeException, split it into two and send it again

...

  1. Choosing COMPRESSION_RATIO_DETERIORATE_STEP to be 0.005 means that it will take about 20 batches to decrease compression ratio by 0.1. So in a normal case, after a batch is split, it takes quite a few batches to reach another good estimated compression ratio which may cause batch split again (assuming there is no batch in the middle to deteriorate the compression ratio estimation). 
  2. Use 10:1 as COMPRESSION_RATIO_DETERIORATE_STEP:COMPRESSION_RATIO_IMPROVING_STEP. As an intuitive approximation, statistically speaking this allows the number of high compression ratio message and low compression ratio message to be less than 10:1 without a split. For example, when there are 9 batches improving the compression ratio, the 10th batch may deteriorate the compression ratio. 
  3. For compression efficiency, 1.05 is chosen as the COMPRESSION_RATIO_ESTIMATION_FACTOR. This value is what we are using today and it gives reasonable contingency for batch size estimation. 

Compatibility, Deprecation, and Migration Plan

The KIP is backwards compatible.

Rejected Alternatives

Batching the messages based on uncompressed bytes

Introduce a new configuration enable.compression.ratio.estimation to allow the users to opt out the compression ratio estimation, but use the uncompressed size for batching directly.

...

  1. it adds a new configuration to the producer which exposes some nuances.
  2. for highly compressible messages, users may still need to guess the compression ratio to ensure the compressed batch is not too small.

Splitting Batches based on configured batch.size

The concern for that is that batch size is not only associated with the max message size, but also related to the memory consumption. For example, if a producer is sending messages to 1000 partitions (Mirror Maker usually has a much larger number) and max message size is 1 MB, setting the batch size to max message size means it will take 1 GB memory to hold one batch for each partition. This would actually result in unwanted small batches because the batches need to be sent out prematurely to release memory for the new batch creation.

So in practice, we usually set the batch size to a number that it has good batching but do not require too much memory to be allocated to a producer, say 500 KB. In this case, we don't want to unnecessarily split the batch even if it is bigger than the configured batch size because likely it will still be less than the max message size.
It is true that if the batch size is set to be larger than max message size, the performance will drop significantly and users may not see any exception. But in practice this would be more of a misconfiguration. The ideal solution to this would be letting the producer get the max message size configuration from the broker and split the batches based on that before sending it over the wire. This will also avoid the misconfiguration. For now since we have the metrics for user to detect frequent batch split, it is a good intermediate stage before we have the ideal solution.