...
Solution: If we change the ordering of messages in a batch to prefix with headers of all messages first and followed by the payload of all messages, it will lead to an optimization where we have to "partially" decompress the batch (i.e. the prefix).
Ideas for message format v.3 wiki: ideas for kafka message format v.3
Motivation
Logs in kafka consists of batches, and one batch consists of many messages. So if the size of each message header can be reduced, it'll improve the network traffic and storage size (and money, of course) a lot, even it's a small improvement. Some companies now handle trillions of messages per day using Kafka, so, if we can reduce just 1 byte per message, the save is really considerable.
...
Code Block |
---|
baseOffset: int64 batchLength: int32 partitionLeaderEpoch: int32 magic: int8 (current magic value is 2) // <-- will bump to 3 crc: int32 attributes: int16 bit 0~2: 0: no compression 1: gzip 2: snappy 3: lz4 4: zstd bit 3: timestampType bit 4: isTransactional (0 means not transactional) bit 5: isControlBatch (0 means not a control batch) bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction) // new added attribute below bit 7: ignoreMessageAttributes (0 means not to ignore) bit 8~15: unused producerId: int64 producerEpoch: int16 baseSequence: int32 lastOffsetDelta: int32 // <-- will change to varint baseTimestamp: int64 // <-- will change to varlong maxTimestamp: int64 // <-- will change semantic to maxTimestampDelta and change type to varint producerId: int64 producerEpoch: int16 baseSequence: int32 records: [Record] |
Furthermore, the record batch header can also be smaller. I'd also like to improve them by:
...
3. lastOffsetDelta: change the type from int32 to varint. Same as above, In most case, the offset delta should be small. So, changing to varint will save space.
4. move above 3 fields to the trail of the batch headers.
Note:
The biggest value of varint 2 bytes is 16383
...