You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 9 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: "Under Discussion"

Discussion thread: here

JIRA: here


Motivation

Logs in kafka consists of batches, and one batch consists of many messages. So if the size of each message header can be reduced, it'll improve the network traffic and storage size (and money, of course) a lot, even it's a small improvement. Some companies now handle trillions of messages per day using Kafka, so, if we can reduce just 1 byte per message, the save is really considerable.


Currently, The message format in V2 is like this:

length: varint
attributes: int8
    bit 0~7: unused
timestampDelta: varlong
offsetDelta: varint
keyLength: varint
key: byte[]
valueLen: varint
value: byte[]
Headers => [Header]

We can focus on the attributes  field, it is introduced since message format v1 and till now, it is still unused.

So, I'm proposing we can add a flag in batch attribute to indicate if the messages inside the batch have attributes field or not.

Public Interfaces

In the record batch header, We'll add a new attribute bit to indicate if the messages in the batch contain attributes or not. If it's 0, it means the messages have attribute field (like what we have now). If it's set to 1, it means no attribute field. Also, the magic value will bump to 3.


baseOffset: int64
batchLength: int32
partitionLeaderEpoch: int32
magic: int8 (current magic value is 2) // <-- will change to 3
crc: int32
attributes: int16
    bit 0~2:
        0: no compression
        1: gzip
        2: snappy
        3: lz4
        4: zstd
    bit 3: timestampType
    bit 4: isTransactional (0 means not transactional)
    bit 5: isControlBatch (0 means not a control batch)
    bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction)
    // new added attribute below
    bit 7: ignoreMessageAttributes (0 means not to ignore)
    bit 8~15: unused
lastOffsetDelta: int32
baseTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
records: [Record]


Proposed Changes

When writing the batch, we'll default set ignoreMessageAttributes field to 1 in record batch header, and create records without attribute field.

When reading batches, we'll first check the ignoreMessageAttributes field before reading each records, and read messages accordingly.

When old consumer reading the new formatted message, we'll do a downconvert to the older message format like what we did for old consumer accepting message format v1 now.

Compatibility, Deprecation, and Migration Plan

Totally backward compatible. Even if one day we want to use message attribute field, we can always update the ignoreMessageAttributes to add attributes.

Test Plan

Unit test + Integration test

Rejected Alternatives

Keep using the existing message format. But that would keep wasting 1 byte each message to store zero content.

  • No labels