Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added message format changes

...

We only need to retain the PID of the completed transaction along with a timestamp, so we can eventually remove the TransactionalId->PID mapping for the producer. See the Expiring PIDs section below.

Message Format

In order to add new fields such as PID and epoch into the produced messages for transactional messaging and de-duplication, we need to change Kafka’s message format and bump up its version (i.e. the “magic byte”). More specifically, we need to add the following fields into each message:

  • PID => int64

  • Epoch => int16

  • Sequence number => int32

Adding these fields on the message-level format schema potentially adds a considerable amount of overhead; on the other hand, at least the PID and epoch will never change within a set of messages from a given producer. We therefore propose to enhance the current concept of a message set by giving it a separate schema from an individual message. In this way, we can locate these fields only at the message set level which allows the additional overhead to be amortized across batches of messages rather than paying the cost for each message separately.

Both the epoch and sequence number will wrap around once int16_max and int32_max are reached. Since there is a single point of allocation and validation for both the epoch and sequence number, wrapping these values will not break either the idempotent or transactional semantics.

For reference, the current message format (v1) is the following:

 

Code Block
languagetext
MessageSet => [Offset MessageSize Message]
 Offset => int64
 MessageSize => int32

Message => Crc Magic Attributes Timestamp Key Value
 Crc => int32
 Magic => int8
 Attributes => int8
 Timestamp => int64
 Key => bytes
 Value => bytes

 


A message set is a sequence of messages. To support compression, we currently play a trick with this format and allow the compressed output of a message set to be embedded in the value field of another message (a.k.a., the “wrapper message”). In this design, we propose to extend this concept to non-compressed messages and to decouple the schema for the message wrapper (which contains the compressed message set). This allows us to maintain a separate set of fields at the message set level and avoid some costly redundancy:

 

Code Block
languagetext
MessageSet => 
  Offset => int64
  Length => int32
  CRC => int32
  Magic => int8  /* bump up to “2” */
  Attributes => int16
  OffsetDelta => int32 {NEW}
  MaxTimestamp => int64 {NEW}
  PID => int64 {NEW}
  Epoch => int16 {NEW}
  Sequence => int32 {NEW}
  Messages => Message1, Message2, … , MessageN {NEW}

Message => {ALL FIELDS NEW}
  Length => uintVar
  Attributes => int8
  Timestamp => int64
  OffsetDelta => uintVar
  KeyLen => uintVar [OPTIONAL]
  Key => data [OPTIONAL]
  Value => data [OPTIONAL]
  CRC => int32





 

The ability to store some fields only at the message set level allows us to conserve space considerably when batching messages into a message set. For example, there is no need to write the PID within each message since it will always be the same for all messages within each message set. In addition, by separating the message level format and message set format, now we can also use variable-length types for the inner (relative) offsets and save considerably over a fixed 8-byte field size.

Message Set Fields

The first four fields of a message set in this format must to be the same as the existing format because any fields before the magic byte cannot be changed in order to provide a path for upgrades following a similar approach as was used in KIP-32. Clients which request an older version of the format will require conversion on the broker.

The offset provided in the message set header represents the offset of the first message in the set. Similarly, we the sequence number field represents the sequence number of the first message. We also include an “offset delta” at the message set level to provide an easy way to compute the last offset / sequence number in the set: i.e. the starting offset of the next message set should be “offset + offset delta”. This also allows us to search for the message set corresponding to a particular offset without scanning the individual messages, which may or may not be compressed. Similarly, we can use this to easily compute the next expected sequence number.

The offset, sequence number, and offset delta values of the message set never change after the creation of the message set. The log cleaner may remove individual messages from the message set, and it may remove the message set itself once all messages have been removed, but we must preserve the range of sequence numbers that were ever used in a message set since we depend on this to determine the next sequence number expected for each PID.

Message Set Attributes: The message set attributes are essentially the same as in the existing format, though we have added an additional byte for future use. In addition to the existing 3 bits used to indicate the compression codec  and 1 bit for timestamp type, we will use another bit to indicate that the message set is transactional (see Transaction Markers section). This lets consumers in READ_COMMITTED know whether a transaction marker is expected for a given message set.

 

Compression (3)

Timestamp type (1)

Transactional (1)

Unused (11)

 

Discussion on Maximum Message Size. The broker’s configuration max.message.size previously controlled the maximum size of a single uncompressed message or a compressed set of messages. With this design, it now controls the maximum message set size, compressed or not. In practice, the difference is minor because a single message can be written as a singleton message set, with the small increase in overhead mentioned above.

Message Fields

The length field of the message format is encoded as an unsigned variable-length int, abbr. “uintVar”. Similarly the offset delta and key length fields are encoded as unitVar as well. The message’s offset can then be calculated as the offset of the message set + offset delta. At the end we still maintains a message-level CRC (reason discussed below).

Message Attributes: In this format, we have also added a single byte for individual message attributes. Only message sets can be compressed, so there is no need to reserve some of these attributes for the compression type. Instead, we use the first bit to indicate a null key and the second bit to indicate a null value (1 indicates null, 0 indicates non-null). This allows us to use an unsigned variable length encoding for the key and value length fields. When the key or value is null, we can omit the corresponding fields:

  • Null-key bit is 1: skip the key-length and key fields.

  • Null-value bit is 1: skip the key-length (since it can now be calculated) and value fields.

  • Both bits are 1: skip key-length, key and value fields.

The control flag indicates that the message is a control message, which means it is not intended for application consumption. The remaining bits are currently unused, though one could be used for KIP-87 (message tombstone marker).

 

Null Key (1)

Null Value (1)

Control Flag (1)

Unused (5)

Control messages will always have a non-null key, which is used to indicate the type of control message type with the following schema:

Code Block
languagetext
ControlMessageKey => Version ControlMessageType
  Version => int16
  ControlMessageType => int16 

In this proposal, a control message type of 0 indicates a COMMIT marker, and a control message type of 1 indicates an ABORT marker. The schema for control values is generally specific to the control message type.

Discussion on Message-level Schema. A few additional notes about this schema:

  1. Having easy access to the offset of the first message allows us to stream messages to the user on demand. In the existing format, we only know the last offset in each message set, so we have to read the messages fully into memory in order to compute the offset of the first message to be returned to the user.

  2. As before, the message set header has a fixed size. This is important because it allows us to do in-place offset/timestamp assignment on the broker before writing to disk.

  3. The message set CRC covers the header and message data. Alternatively, we could let it cover only the header, but if compressed data is corrupted, then decompression may fail with obscure errors.

  4. We have preserved the per-message CRC in this format. We considered removing it since the message set CRC covers the data, but some auditing applications depend on individual messages having their own CRC. To make computation and validation easier, we have located it at the end of the message.

  5. Individual messages within a message set have their full size (including header, key, and value) as the first field. This is designed to make deserialization efficient. As we do for the message set itself, we can read the size from the input stream, allocate memory accordingly, and do a single read up to the end of the message. This also makes it easier to skip over the messages if we are looking for a particular one, which potentially saves us from copying the key and value.

  6. We have not included a field for the size of the value in the message schema since it can be computed directly using the message size and the length of the header and key.

Space Comparison

As the batch size increases, the overhead of the new format grows smaller compared to the old format because of the eliminated redundancy. The overhead per message in the old format is fixed at 34 bytes. For the new format, the message set overhead is 45 bytes, while per-message overhead ranges from 14 to 25 bytes. This makes it more costly to send individual messages, but space is quickly recovered with even modest batching. For example, assuming a fixed message size of 1K with 100 byte keys, the overhead increases by only 17 bytes for each additional batched message:

Batch Size

Old Format Overhead

New Format Overhead

1

34*1 = 34

45 + 1*17 = 62

3

34*3 = 102

45 + 3*17 = 96

10

34*10 = 340

45 + 10*17 = 215

50

34*50 = 1700

45 + 50*17 = 895

100

34*100 = 3400

45 + 100*17 = 1745

Compatibility, Deprecation, and Migration Plan

...