Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Update message format to include changes in Kafka 0.11.0.0

...

  

Field

Description

Offset

This is the offset used in kafka as the log sequence number. When the producer is sending non compressed messages, it can set the offsets to anything. When the producer is sending compressed messages, to avoid server side recompression, each compressed message should have offset starting from 0 and increasing by one for each inner message in the compressed message. (see more details about compressed messages in Kafka below)

Crc

The CRC is the CRC32 of the remainder of the message bytes. This is used to check the integrity of the message on the broker and consumer.

MagicByte

This is a version id used to allow backwards compatible evolution of the message binary format. The current value is 1.

Attributes

This byte holds metadata attributes about the message.

The lowest 3 bits contain the compression codec used for the message.

The fourth lowest bit represents the timestamp type. 0 stands for CreateTime and 1 stands for LogAppendTime. The producer should always set this bit to 0. (since 0.10.0)

All other bits should be set to 0.

TimestampThis is the timestamp of the message. The timestamp type is indicated in the attributes. Unit is milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC)).

Key

The key is an optional message key that was used for partition assignment. The key can be null.

Value

The value is the actual message contents as an opaque byte array. Kafka supports recursive messages in which case this may itself contain a message set. The message can be null.

 

In Kafka 0.11, the structure of the 'MessageSet' and 'Message' were significantly changed. Not only were new fields added to support new features like exactly once semantics and record headers, but the recursive nature of the previous versions of the message format was eliminated in favor of a flat structure. A 'MessageSet' is now called a 'RecordBatch', which contains one or more 'Records' (and not 'Messages'). When compression is enabled, the RecordBatch header remains uncompressed, but the Records are compressed together. Further, multiple fields in the 'Record' are varint encoded, which leads to significant space savings for larger batches. 

The new message format has a Magic value of 2. Its structure is as follows:

Code Block
RecordBatch =>
  FirstOffset => int64
  Length => int32
  PartitionLeaderEpoch => int32 
  Magic => int8  
  CRC => int32
  Attributes => int16
  LastOffsetDelta => int32 
  FirstTimestamp => int64 
  MaxTimestamp => int64 
  ProducerId => int64 
  ProducerEpoch => int16 
  FirstSequence => int32 
  Messages => [Message]
 
Record => 
  Length => varint
  Attributes => int8
  TimestampDelta => varint
  OffsetDelta => varint
  KeyLen => varint
  Key => data
  ValueLen => varint
  Value => data
  Headers => [Header] 
 
Header => HeaderKey HeaderVal
  HeaderKeyLen => varint
  HeaderKey => string
  HeaderValueLen => varint
  HeaderValue => data

The semantics of the newly added fields are described below: 

  

Field

Description

FirstOffset

Denotes the first offset in the RecordBatch. The 'offsetDelta' of each Record in the batch would be be computed relative to this FirstOffset. In particular, the offset of each Record in the Batch is its 'OffsetDelta' + 'FirstOffset'.

LastOffsetDeltaThe offset of the last message in the RecordBatch. This is used by the broker to ensure correct behavior even when Records within a batch are compacted out.

PartitionLeaderEpoch

Introduced with KIP-101, this is set by the broker upon receipt of a produce request and is used to ensure no loss of data when there are leader changes with log truncation. Client developers do not need to worry about setting this value.

FirstTimeStamp

The timestamp of the first Record in the batch. The timestamp of each Record in the RecordBatch is its 'TimestampDelta' + 'FirstTimestamp'.

Attributes

This byte holds metadata attributes about the message.

The lowest 3 bits contain the compression codec used for the message.

The fourth lowest bit represents the timestamp type. 0 stands for CreateTime and 1 stands for LogAppendTime. The producer should always set this bit to 0. (since 0.10.0)

The fifth lowest bit indicates whether the RecordBatch is part of a transaction or not. 0 indicates that the RecordBatch is not transactional, while 1 indicates that it is. (since 0.11.0.0).

MaxTimestampThe timestamp of the last Record in the batch. This is used by the broker to ensure the correct behavior even when Records within the batch are compacted out.

ProducerId

Introduced in 0.11.0.0 for KIP-98, this is the broker assigned producerId received by the 'InitProducerId' request. Clients which want to support idempotent message delivery and transactions must set this field.

ProducerEpoch

Introduced in 0.11.0.0 for KIP-98, this is the broker assigned producerEpoch received by the 'InitProducerId' request. Clients which want to support idempotent message delivery and transactions must set this field.

FirstSequence

Introduced in 0.11.0.0 for KIP-98, this is the producer assigned sequence number which is used by the broker to deduplicate messages. Clients which want to support idempotent message delivery and transactions must set this field. The sequence number for each Record in the RecordBatch is its OffsetDelta + FirstSequence.

HeadersIntroduced in 0.11.0.0 for KIP-82, Kafka now supports application level record level headers. The Producer and Consumer APIS have been accordingly updated to write and read these headers.

 

Compression

Kafka supports compressing messages for additional efficiency, however this is more complex than just compressing a raw message. Because individual messages may not have sufficient redundancy to enable good compression ratios, compressed messages must be sent in special batches (although you may use a batch of one if you truly wish to compress a message on its own). The messages to be sent are wrapped (uncompressed) in a MessageSet structure, which is then compressed and stored in the Value field of a single "Message" with the appropriate compression codec set. The receiving system parses the actual MessageSet from the decompressed value. The outer MessageSet should contain only one compressed "Message" (see KAFKA-1718 for details).

...