Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

 

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

...

  • Adds the ability for producers to set standard header key=value string value pairs
  • No incompatible client api change (only new methods)
  • No overhead for users who don't use headers or messages without headers - uses attributes Bit 4 to define if headers are present or not as such no overhead for messages without headers
  • Allows users to specify the serialization of the key=value map (String(&=), JSON, AVRO).
  • Provides a standardized interface to eco systems of tools can grow around the feature

The disadvantage of this proposal is:

  • Change to the message object
  • Using up of bit 4 of the attributes byte

...

 

 

There are two wire protocol changes options for the above mention change - Option A is our preffered:

  • Option A: Use a headers flag (bit 4 of the attributes)
    • Advantages
      • No message size over head for messages without headers (aka no change / effect in message size for users not needing this)
    • Disadvantages
      • Uses up an attributes bit
  • Option B: No flag and simply use headers length (as done with key currently)
    • Advantages
      • Simpler and follows existing pattern with key and value where simply length value is used to determine if present
    • Disadvantages
      • Overhead of 4bytes for messages without any headers (will have a message size change / effect for users not needing this)

 

 

Wire protocol change (Option A) - use attribute bit4 as flag for if headers present. and add (optional) header size and field to the message format

Code Block
languagejava
MessageAndOffset => Offset MessageSize Message
  Offset => int64  
  MessageSize => int32
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value
    Crc => int32
    MagicByte => int8  <---------------------- Bump up magic byte to 2
    Attributes => int8 <---------------------- Use Bit 4 as boolean flag for if headers present
    Timestamp => int64
    KeyLength => int32
    Key => bytes
    (optional) HeadersLength => int32 <------------------ NEW [optional] length of the byte[] of the serialized headers if headers
    (optional) Headers => bytes <------------------------ NEW [optional] serialized form of the headers Map<String, String>
    ValueLength => int32
    Value => bytes

 

Wire protocol change (Option B) - add a header field and size to the message format

 Code Block
languagejava
MessageAndOffset => Offset MessageSize Message
  Offset => int64  
  MessageSize => int32
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value
    Crc => int32
    MagicByte => int8  <---------------------- Bump up magic byte to 2
    Attributes => int8     Timestamp => int64
    KeyLength => int32
    Key => bytes
    HeadersLength => int32 <------------------ NEW length of the byte[] of the serialized headers (int32 = 0 if no headers)
    (optional) Headers => bytes <------------- NEW [optional] serialized form of the headers Map<String, String> (if old message or no headers this can empty)
    ValueLength => int32
    Value => bytes

 

Add a headers field Map<String, String> to both ProducerRecord and ConsumerRecord

...