Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Topic-level configuration: message.downconversion.enabledenable
  • Broker-level configuration: log.message.downconversion.enabledenable
  • Type: Boolean
  • Possible Values: True / False
  • Explanation: Controls whether down-conversion of messages is enabled to satisfy client FetchRequest. If true, broker will down-convert messages when required. If set to false, broker will disable down-conversion of messages and will send UnsupportedVersionException in response to any client FetchRequest that requires down-conversion.
  • Default Value: True

...

To resolve this, we will use the pre-downconversion size (Spre) as an estimate for the an estimate Spre (defined below) as opposed to the actual post-downconversion size (Spost) for each topic-partition. We also commit to send out exactly Spre bytes for that particular partition. In terms of the above equation, the size will be computed as:

Code Block
Size = HeaderSize + MetadataSize + ∑(Spre)
 
where Spre = max(size_pre_downconversion, size_first_batch_after_downconversion)

Given this, we have three possible scenarios:

...

To ensure that consumer keeps making progress, Kafka makes sure that every response consists of at least one message batch for the first partition, even if it exceeds fetch.max.bytes and max.partition.fetch.bytes (KIP-74). When Spre < Spost (second case above), we cannot send out all the messages and need to trim message batch(es) towards the end. It is possible that we end up in a situation with a single partial batch in this case.

To prevent this from happening, we will not delay down-conversion of the first partition in the response. We will down-convert all messages of the first partition in the I/O thread (like we do today), and only delay down-conversion for subsequent partitions. This ensures that know the exact size of down-converted messages in the first partition beforehand, and can fully accomodate it in the response.

This can be done by accepting an additional parameter doLazyConversion in KafkaApis#handleFetchRequest#convertedPartitionData which returns FetchResponse.PartitionData containing either LazyDownConvertedRecords or MemoryRecords depending on whether we want to delay down-conversion or not.

Note: To ensure consumer progress, we want to make sure that we send out at least one complete message batch. The approach above is more conservative in that it down-converts all messages in the first topic-partition. Ideally, we would only down-convert the first message batch of the first partition and delay the remaining messages to be down-converted lazily. Given that this could be a bit tricky to implement, and the fact that the maximum fetch size for one topic-partition is limited to 1MB by default, we can revisit this optimization in the futureBecause Spre is at least as big as the size of first batch after down-conversion, we are guaranteed that we send out at least that one batch.

Determining number of batches to down-convert

...