You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

Status

Current stateUnder Discussion

Discussion thread: here

JIRA: KAFKA-3995

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka has a strict message size limit on the messages. This size limit is applied to the compressed messages as well.

Currently KafkaProducer uses an estimation to do guess the compressed message size from the uncompressed message size. The estimation is based on a weighted average in a sliding window on the compression ratio of the most recent batches for each compression type. The formula is the following:

Assume COMPRESSION_RATIO_N stands for the compression ratio of the Nth batch. The estimated compression ratio for the (N+1)th batch is:

Σ(COMPRESSION_RATIO_N * DAMPING_FACTOR^(N - 1) * (1 - DAMPING_FACTOR)) + INITIAL_COMPRESSION_RATIO * DAMPING_FACTOR^N

When the (N+1)th batch is generated, this estimated compression ratio will be used (multiplied by a factor of 1.05 for contingency) to estimate the compressed size from the uncompressed size. When the estimated compressed size reaches the batch.size configuration, the batch will be closed and sent to the brokers.

The problem of the current behavior is that this estimation could be off and cause RecordTooLargeException.

For example, if the batch size is set to 1MB and the max message size is 1MB. Initially a the producer is sending messages (each message is 1MB) to topic_1 whose data can be compressed to 1/10 of the original size. After a while the estimated compression ratio in the compressor will be trained to 1/10 and the producer would put 10 messages into one batch. Now the producer starts to send messages (each message is also 1MB) to topic_2 whose message can only be compress to 1/5 of the original size. The producer would still use 1/10 as the estimated compression ratio and put 10 messages into a batch. That batch would be 2 MB after compression which exceeds the maximum message size. In this case the user do not have many options other than resend everything or close the producer if they care about ordering and message loss.

This is especially an issue for services like MirrorMaker whose producer is shared by many different topics.

This KIP proposes to solve this issue by doing the followings:

  1. Change the way to estimate the compression ratio
  2. Split the oversized batch and resend the split batches. 

Public Interfaces

This KIP introduces the following new metric batch-split-rate to the producer. The metric records rate of the batch split occurrence.

Although there is no other public API change, due to the behavior change, users may want to reconsider batch size setting to improve performance. 

Proposed Changes

Decompress the batch which encounters a RecordTooLargeException, split it into two and send it again

There are a few things to be think about for this approach:

  1. More overhead introduced on the producer side. The producer have to decompress the batch, regroup the messages and resend them. If the producer keeps the original uncompressed message to avoid potential decompression, it will have huge memory overhead.
  2. The split batches is not guaranteed to be smaller than the max size limit. Potentially there will multiple retries until the messages get through, or fail.
  3. In the scenario such as mirror maker, due to different compression ratio in different topics, some of the topics may have very different compression ratio from the average compression ratio, this will potentially introduce many split and resend, which introduces a lot of overhead.

To address the above caveats, we propose to also change the way to estimate the compression ratio:

  1. Estimate the compression ratio for each topic independently.
  2. Given that COMPRESSION_RATIO = COMPRESSED_SIZE/UNCOMPRESSED_SIZE, Change the compression ratio estimation from weighted average on a sliding window to the following:
    1. Initially set ESTIMATED_RATIO = 1.0
    2. If OBSERVED_RATIO < ESTIMATED_RATIO, decrease the ESTIMATED_RATIO by COMPRESSION_RATIO_IMPROVING_STEP (0.005)
    3. If OBSERVED_RATIO > ESTIMATED_RATIO, increase the ESTIMATED_RATIO by COMPRESSION_RATIO_DETERIORATE_STEP (0.05)
    4. When estimating the batch size, the producer will use (ESTIMATED_RATIO * UNCOMPRESSED_SIZE * 1.05) where 1.05 is the COMPRESSION_RATIO_ESTIMATION_FACTOR for contingency. 
    5. If batch split occurred, reset the ESTIMATED_RATIO to 1.0

Based on the test in this patch, the chance of splitting a batch is much less than 10% even when the compression ratio of the messages in the same topic are highly different.

NOTE: The COMPRESSION_RATIO_IMPROVING_STEP, COMPRESSION_RATIO_DETERIORATE_STEP and COMPRESSION_RATIO_ESTIMATION_FACTOR are sort of magic number but chosen based on the following considerations:

  1. The time for the estimated compression ratio to approach the observed compression ratio.
    • The larger the COMPRESSION_RATIO_IMPROVING_STEP is, the faster the estimated compression ratio will approach the observed compression ratio range.
  2. How stable the estimated compression ratio is:
    • The larger the COMPRESSION_RATIO_DETERIORATE_STEP and COMPRESSION_RATIO_IMPROVING_STEP are, the more churn will there be.
  3. Reduce the likelihood of batch split.
    • The larger the COMPRESSION_RATIO_DETERIORATE_STEP:COMPRESSION_RATIO_IMPROVING_STEP is, the less batches would be split.
    • The smaller COMPRESSION_RATIO_IMPROVING_STEP is, the less batches would be split.
    • The larger the COMPRESSION_RATIO_ESTIMATION_FACTOR is, the less likely a batch will be split.
  4. Efficiency
    • The larger the COMPRESSION_RATIO_ESTIMATION_FACTOR is, the less efficient the compression would be.

The effect of those three values are difficult to quantify on per message basis, and would vary according to the traffic pattern. However,  here are how they are chosen:

  1. Choosing COMPRESSION_RATIO_DETERIORATE_STEP to be 0.005 means that it will take about 20 batches to decrease compression ratio by 0.1. So in a normal case, after a batch is split, it takes quite a few batches to reach another good estimated compression ratio which may cause batch split again (assuming there is no batch in the middle to deteriorate the compression ratio estimation). 
  2. Use 10:1 as COMPRESSION_RATIO_DETERIORATE_STEP:COMPRESSION_RATIO_IMPROVING_STEP. As an intuitive approximation, statistically speaking this allows the number of high compression ratio message and low compression ratio message to be less than 10:1 without a split. For example, when there are 9 batches improving the compression ratio, the 10th batch may deteriorate the compression ratio. 
  3. For compression efficiency, 1.05 is chosen as the COMPRESSION_RATIO_ESTIMATION_FACTOR. This value is what we are using today and it gives reasonable contingency for batch size estimation. 

Compatibility, Deprecation, and Migration Plan

The KIP is backwards compatible.

Rejected Alternatives

Batching the messages based on uncompressed bytes

Introduce a new configuration enable.compression.ratio.estimation to allow the users to opt out the compression ratio estimation, but use the uncompressed size for batching directly.

The downside of this approach are:

  1. it adds a new configuration to the producer which exposes some nuances.
  2. for highly compressible messages, users may still need to guess the compression ratio to ensure the compressed batch is not too small.

 

 

  • No labels