Status
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-2511
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Today the broker needs to decompress compressed messages, assign offsets to each message and recompress the messages again. This causes additional CPU cost. This KIP is trying to avoid server side recompression.
This KIP is a distilled/improved version of an earlier discussion that we started.
Public Interfaces
We propose the following change to the message format
MessageAndOffset => MessageSize Offset Message MessageSize => int32 Offset => int64 <------ CHANGE: Base offset for wrapper message of compressed message, relative offset for inner compressed message. Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value Crc => int32 MagicByte => int8 Attributes => int8 KeyLength => int32 Key => bytes ValueLength => int32 Value => bytes
Proposed Changes
Wire protocol change
Change the usage of offset field
- When the producer compresses a message, write the relative offset value in the raw message's offset field. Leave the wrapped message's offset blank.
- When broker receives a compressed message, it only needs to
- Decompress the message to verify the CRC and relative offset.
NOTE: If the relative offset is not contiguous (e.g., if this is a mirrored compacted topic), the broker will reassign the relative offsets. There are two ways to handle this - (i) reject the ProducerRequest or (ii) just assign the relative offsets. We chose to reassign offsets rather than reject the request because there is a useful use case where mirror maker can do a direct copy from source cluster to destination cluster without even decompressing the message. In this case, the compressed message can have noncontinuous relative offsets (for compacted topics). - Set outer message's base offset. (Since the broker only needs to update the message-set header, there is no need to re-compress message sets.)
- Decompress the message to verify the CRC and relative offset.
- When the log cleaner compacts log segments, it needs to update the inner message's relative offset values. (This will leave "holes" inside the new wrapped message).
- When the consumer receives a message, it converts the relative offset back to actual offset.
Compatibility, Deprecation, and Migration Plan
NOTE: This part is drafted based on the assumption that KIP-31 and KIP-32 will be implemented in one patch.
The proposed protocol is not backward compatible. The migration plan are as below:
- Increment magic byte in MessageAndOffset from 0 to 1.
- Upgrade broker to support both V0 and V1 of MessageAndOffset.
- When broker see a producer request using V0, it will still decompress the message, assign offsets and re-compress.
- When broker see a producer request using V1, it will decompress the message for verification, assign the offset to outer message and NOT do recompression.
- Upgrade consumer to support both V0 and V1.
- Upgrade producer to send MessageAndOffset V1.
For producer, there will be no impact.
For old consumers that cannot recognize V1, if they see the new protocol an exception will be thrown for unsupported version. (The current code in ZookeeperConsumerConnector does not validate the magic byte. This is a bug and we will fix it in a separate ticket)
For upgraded consumers, they can handle both V0 and V1.
Rejected Alternatives
None