Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP has the following public interface changes:

  1. Add a new headers length and value (byte[]) to the core message format.
  2. Add a headers (Map<int, byte[]>) field to ProducerRecord and ConsumerRecord. A producer/interceptors will be able to set headers on a ProducerRecord. A consumer/interceptors will see the message headers when it sees the messages.

  3. Add accessor methods on the Producer/ConsumerRecord void setHeader(int, byte[]) and a byte[] getHeader(int)
  4. Add ProduceRequest/ProduceResponse V3 which uses the new message format.
  5. Add FetchRequest/FetchResponse V3 which uses the new message format.
  6. The serialization of the [int, bye[]] header set will on the wire using a strict format
  7. Each headers value will be custom serializable by the interceptors/plugins that use the header.
  8. As int key based headers for compactness ranges will be reserved for different usages:
    1. client headers adopted by the kafka open source (not necessary the kafka open source owns it)
    2. broker headers adopted by the kafka open source (not necessary the kafka open source owns it)
    3. commercial vendor space
    4. custom inhouse

...

  • Adds the ability for producers to set standard header key=value value pairs
  • No incompatible client api change (only new methods)
  • Allows users to specify the serialization of the header value per header
  • Compact key space
  • Provides a standardized interface to eco systems of tools can grow around the feature

The disadvantage of this proposal is:

  • Change to the message object

Add a headers field Map<int, byte[]> to both ProducerRecord and ConsumerRecord

  • Accessor methods of void setHeader(int, byte[]) and byte[] getHeader(int) added to interface of both.

Wire protocol change - use attribute bit4 as flag for if headers present. and add (optional) header size (variable length int) and field to the message format

The below is for the core Message wire protocol change needed to fit headers into the message.

...

Code Block
languagejava
MessageAndOffset => Offset MessageSize Message
  Offset => int64  
  MessageSize => int32
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value
    Crc => int32
    MagicByte => int8  <---------------------- Bump up magic byte to 2
    Attributes => int8 <---------------------- Use Bit 4 as boolean flag for if headers present
    Timestamp => int64
    KeyLength => int32
    Key => bytes
    (optional) HeadersLength => variable int32 <------------------ NEW [optional] length of the byte[] of the serialized headers if headers
    (optional) Headers => bytes <--------------------------------- NEW [optional] serialized form of the headers Map<String, String>
    ValueLength => int32
    Value => bytes

...

With this where ints are in the below ranges we get the benefits of less bytes for an int than standard 4 byte allocation and best benefit.

 

3 bytes :: -129 -> -32768
2 bytes :: -33 -> -128
1 bytes :: 0 -> -32

...

2 bytes :: 128 -> 255
3 bytes :: 256 -> 65535

As such we propose that:

+ve ints 0->255 are reserved for the apache kafka open registered headers as these are more likely to be more commonly used as such saves space

...