You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state[Under Discussion]

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-3995

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka has a strict message size limit on the messages. This size limit is applied to the compressed messages as well.

Currently KafkaProducer uses an estimation to do guess the compressed message size from the uncompressed message size. The estimation is based on a weighted average in a sliding window on the compression ratio of the most recent batches for each compression type. The formula is the following:

Assume COMPRESSION_RATIO_N stands for the compression ratio of the Nth batch. The esitmated compression ratio for the (N+1)th batch is:
ESTIMATED_COMPRESSION_RATIO = Σ(COMPRESSION_RATIO_N * DAMPING_FACTOR^(N - 1) * (1 - DAMPING_FACTOR)) + INITIAL_COMPRESSION_RATIO * DAMPING_FACTOR^N

When the (N+1)th batch is generated, this estimated compression ratio will be used (multiplied by a factor of 1.05 for contingency) to estimate the compressed size from the uncompressed size. When the estimated compressed size reaches the batch.size configuration, the batch will be closed and sent to the brokers.

The problem of the current behavior is that this estimation could be off and cause RecordTooLargeException.

For example, if the batch size is set to 1MB and the max message size is 1MB. Initially a the producer is sending messages (each message is 1MB) to topic_1 whose data can be compressed to 1/10 of the original size. After a while the estimated compression ratio in the compressor will be trained to 1/10 and the producer would put 10 messages into one batch. Now the producer starts to send messages (each message is also 1MB) to topic_2 whose message can only be compress to 1/5 of the original size. The producer would still use 1/10 as the estimated compression ratio and put 10 messages into a batch. That batch would be 2 MB after compression which exceeds the maximum message size. In this case the user do not have many options other than resend everything or close the producer if they care about ordering and message loss.

This is especially an issue for services like MirrorMaker whose producer is shared by many different topics.

Public Interfaces

We want to introduce a new configuration enable.comrpession.ratio.estimation to allow the users to opt out the compression ratio estimation, but use the uncompressed size for batching directly.

Proposed Changes

We want to introduce a new configuration enable.comrpession.ratio.estimation to allow the users to opt out the compression ratio estimation, but use the uncompressed size for batching directly.

The default value of this configuration will be true.

When enable.comrpession.ratio.estimation is set to false. The producer will stop estimate the compressed batch size but simply use the uncompressed message size for batching. In reality, if the batch size is set to close to the max message size, the compression ratio may not be an issue. If a single message is larger than the batch.size, the behavior would be the same as the current behavior, i.e. it will be put into a new batch.

This approach would guarantee that the compressed message size will be less than the max message size. And as long as the batch size is set to a reasonable number, the compression ratio is unlikey to be hurt.

Compatibility, Deprecation, and Migration Plan

The KIP only inroduce a new configuraton. The change is completely backwards compatible.

Rejected Alternatives

Decompress the batch which encounters a RecordTooLargeException, split it into two and send it again

The issue of this approach has some caveats:

  1. More overhead introduced on the producer side. The producer have to decompress the batch, regroup the messages and resend them. If the producer keeps the original uncompressed message to avoid potential decompression, it will have huge memory overhead.
  2. The split batches is not guaranteed to be smaller than the max size limit. Potentially there will multiple retries until the messages get through, or fail.
  3. In the scenario such as mirror maker, due to different compression ratio in different topics, some of the topics may have very different compression ratio from the average compression ratio, this will potentially introduce many split and resend, which introduces a lot of overhead.

Keep per topic compression ratio

This approach may solve the problem introduced by the difference of compression ratios among different topics. But the downside is that it does not handle the compression ratio difference within a topic.

 

 

  • No labels