This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: "Under Discussion"
Discussion thread: here
JIRA: here
Motivation
Logs in kafka consists of batches, and one batch consists of many messages. So if the size of each message header can be reduced, it'll improve the network traffic and storage size (and money, of course) a lot, even it's a small improvement. Some companies now handle trillions of messages per day using Kafka, so, if we can reduce just 1 byte per message, the save is really considerable.
Currently, The message format in V2 is like this:
length: varint attributes: int8 bit 0~7: unused timestampDelta: varlong offsetDelta: varint keyLength: varint key: byte[] valueLen: varint value: byte[] Headers => [Header]
We can focus on the attributes
field, it is introduced since message format v1 and till now, it is still unused.
So, I'm proposing we can add a flag in batch attribute to indicate if the messages inside the batch have attributes field or not.
Public Interfaces
In the record batch header, We'll add a new attribute bit to indicate if the messages in the batch contain attributes or not. If it's 0, it means the messages have attribute field (like what we have now). If it's set to 1, it means no attribute field. Also, the magic value will bump to 3.
baseOffset: int64 batchLength: int32 // <-- will change to varint partitionLeaderEpoch: int32 magic: int8 (current magic value is 2) // <-- will bump to 3 crc: int32 attributes: int16 bit 0~2: 0: no compression 1: gzip 2: snappy 3: lz4 4: zstd bit 3: timestampType bit 4: isTransactional (0 means not transactional) bit 5: isControlBatch (0 means not a control batch) bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction) // new added attribute below bit 7: ignoreMessageAttributes (0 means not to ignore) bit 8~15: unused lastOffsetDelta: int32 // <-- will change to varint baseTimestamp: int64 // <-- will change to varlong maxTimestamp: int64 // <-- will change to varlong producerId: int64 producerEpoch: int16 baseSequence: int32 records: [Record]
Furthermore, the record batch header can also be smaller. I'd also like to improve them by:
1. baseTimestamp: change type from int64 to varlong. With varlong, our current timestamp in ms needs only 6 bytes. I've tested, it still needs only 6 bytes after 10 years.
2. maxTimestamp: change the semantic to maxTimestampDelta, and change type from int64 to varlong. In most case, the timestamp for each record inside the batch should be very close. So, changing to varlong will save space.
3. lastOffsetDelta: change the type from int32 to varint. Same as above, In most case, the offset delta should be small. So, changing to varint will save space.
4. Length: It means the size of this batch. This change needs more explanation.
The default producer `batch.size` config is 16384, and `linger.ms` is 0, that means, in most case, this value should be smaller than 16384. Besides, when producer send batches to a node, it’ll also send other (in-progress) batches that also belong to this node together. For example:
the leader of tp-0, tp-1, tp-2 are node 1, and now, the batch of tp-0 is full, ready to send to node 1, the producer will also send batches in tp-1, tp-2 (even though the batch size is small) to node 1. Because of this characteristic, in the log segment, there should be many “small batches”. So, if changing the length field from int32 to varint should also be good.
Note:
The biggest value of varint 2 bytes is 16383
The biggest value of varint 3 bytes is 2097151
With the above 4 record batch field changes, in a normal batch, with close timestamp between each record and offsets, the save can be:
MaxTimestamp from int64 (8 bytes) to 2 ~ 3 bytes. Suppose the max timestamp delta has a long 30 seconds, which only need 3 bytes for varlong. The offset delta save from int32 (4 bytes) to 1~2 bytes
About the length, we use int32(4 bytes) for now. But as analyzed above, we should have many small batches in the log. Even if it’s the default batch.size 16384, we only need 3 bytes to store by using varint.
In all, we can save around [8 - 6 (baseTimestamp)] + [8 - 3 (max timestamp delta)] + [4 - 2 (offset delta)] + [4 - 3 (length)] = 10 Bytes for each batch.
Proposed Changes
When writing the batch using the new version of message format, we'll default set ignoreMessageAttributes field to 1 in record batch header, and create records without attribute field.
When reading batches, we'll first check the ignoreMessageAttributes field before reading each records, and read messages accordingly.
When old consumer reading the new formatted message, we'll do a downconvert to the older message format like what we did for old consumer accepting message format v1 now.
Compatibility, Deprecation, and Migration Plan
Totally backward compatible. Even if one day we want to use message attribute field, we can always update the ignoreMessageAttributes to add attributes.
Test Plan
Unit test + Integration test
Rejected Alternatives
Keep using the existing message format. But that would keep wasting 1 byte each message to store zero content.