Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. log.message.format.version & message.format.version are removed. They won't serve any purpose and the fact that the allowable values are Kafka versions instead of message format versions has been a source of confusion. If we introduce new message format versions, they should use actual message format versions (v0, v1, v2, v3, etc.).
  2. Produce requests v2 or lower and up-conversion from message formats v0 and v1 won't be supported.
  3. Fetch requests v3 or lower and down-conversion to message formats v0 and v1 won't be supported. However, v0 and v1 records may be returned if they are stored on disk in such message format versions. This is necessary to avoid scanning through all record batches before they are returned. See "Future Work" section for possible options for avoiding this in the future.

Proposed Changes

Apache Kafka 3.0

Once a broker is upgraded to 3.0 and the inter.broker.protocol.version is updated to 3.0, message.format.version is assumed to be 3.0 and we will always write records with message format v2 . This includes in the following scenarios:

  1. Persisting produce records on disk
  2. Persisting group data (consumer offsets and group metadata)
  3. Followers writing replicated data to disk
  4. Writing new segments as part of log compaction

1 and 2 are straightforward since that's the current behavior when the message format is explicitly set to 3.0, but 3 and 4 introduce introduces new behavior. There are two main goals: ensure correctness and The main goals are to use v2 for all client writes and to opportunistically convert to the new format where there is little downside. We will discuss subtle correctness considerations later in the document.

Note that followers will continue to write the records they receive from leaders without conversion. Since leaders will use v2 for new requests, replicating old data is the only case where v0 or v1 records may be written to disk after the upgrade to Apache Kafka 3.0.

Produce and fetch requests with v0 and v1 message formats would be supported via up-conversion and down-conversion. Up-conversion and (especially) down-conversion have measurable performance impact due to increased CPU and memory usage, but the vast majority of Kafka clients have supported v2 for some time (even Spark, a notable late adopter, has supported v2 since Spark 2.4, which was released in October 2018).

For Kafka clusters where topics were configured with message format v0 or v1 at some point, we need a mechanism to ensure records are converted to the new version before the upgrade to Apache Kafka 4.0. We propose the introduction of the log.record.version.force.upgrade config for this purpose. The conversion would happen during start-up using a similar approach as log recovery after unclean shutdownThe current heuristics to detect whether a down-conversion is required rely on an additional scan to check if there are messages with a newer message format than the max version required by the fetch request. With the changes proposed in this KIP, we expect messages on disk to have format version 2 in the common case. Given that, the heuristics are no longer useful and we will always down-convert if the fetch request is v3 or lower.

There are subtle correctness considerations when up-converting records that had been previously written with an old format:. Replication is aligned by batches, but up-conversion will happen independently on each broker for

...

case 3

...

. For compressed records, v0 and v1 message formats support batching, so it's straightforward. Uncompressed records, however, are not batched in v0 and v1. To ensure alignment, we will convert them to single record batches in v2. It's worth noting that message format v2 is slightly less efficient than v0 and v1 when single record batches are used, but it's an acceptable cost for correctness and it only impacts older records. Over time, these records will either be deleted via retention or will be replaced by new versions for compacted topics

...

.

...

Apache Kafka 4.0

We will remove all support for message formats v0 and v1 in Apache Kafka 4.0.

...

On the broker, we can achieve this by providing a mechanism to to force up-convert all the record batches on disk. For example, we introduce a new config log.record.version.force.upgrade with two possible values: null (default) and 2. If the value is set to 2, the broker ensures all segments have records with format v2 after log recovery has completed during start-up. This can be extended to support newer message format versions if and when they are introduced. This config is only allowed to be non-null if the if the inter.broker.protocol.version is 3.0 or higher. In a subsequent major release, we would drop support for down-conversion of Fetch requests to message formats reading v0 and v1 records from disk.

On the consumer, it's more complicated. Brokers assume that consumers can read all message format versions and never up-convert records when handling fetch responses. Furthermore, we cannot change the broker would not support reading v2sbehavior of older broker versions. The options here are a bit complex and require more thought.