Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Adds the ability for producers to set standard header key=value value pairs
  • No incompatible client api change (only new methods)
  • Allows users to specify the serialisation of the header value per header
  • Provides a standardised interface to eco systems of tools that then can grow around the feature

The disadvantage of this proposal is:

  • Change to the message object

Create a Headers Interface and Implementation to encapsulate headers protocol.

  • Fields:
    • Headers[] headersArray
  • Constructors
    • ()
    • (bytes[] headerBytes)
  • Methods
    • Collection<byte[]> get(String key)
    • void add(String key, byte[] value)
    • Collection<String> keys()
    • byte[] asBytes()

Add a headers field Headers to both ProducerRecord and ConsumerRecord

  • Accessor methods of Headers getHeaders() added to interface of the ProducerRecord/ConsumerRecord.

Wire protocol change -

...

add array of headers to end of the message format

The below is for the core Message wire protocol change needed to fit headers into the message.

  • A header key can occur multiple times, clients should expect to handle this, this can be represented as a multi map, or an array of values per key in clients.

This is basing off KIP-98 Message protocol proposals.

 

Code Block
languagejava
MessageAndOffsetMessage => Offset MessageSize Message
  Offset => int64  
  MessageSizeLength => int32uintVar
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value
=> int8
        CrcTimestampDelta => int32intVar
    MagicByte => int8  <---------------------- Bump up magic byte to 2
OffsetDelta => uintVar
        AttributesKeyLen => uintVar int8[OPTIONAL]
    Timestamp => int64
    KeyLengthKey => data VarInt[OPTIONAL]
    Key    ValueLen => uintVar bytes[OPTIONAL]
    HeadersLength => VarInt <--------------- NEW size of the byte[] of the serialised headers if headers
 Value => data [OPTIONAL]
        Headers => bytes Header1, Header2 … HeaderN <--------------------- NEW serialisedAdded formArray of the Header[]
    ValueLength => VarInt
headers
     Value => bytes

Wire protocol of the headers bytes (if above mentioned attributes bit flag is true)

The below is for the headers wire protocol.

...

Code Block
languagejava
Headers (bytes) => [Header]
 
Header =>
		KeyLengthLength => VarIntuintVar <-------------------------------- NEW sizelength of the byte[] of the serialised key value
	Keyindividual header
        KeyLen => bytesuintVar <-------------------------------- NEW serialisedlength stringof (UTF8)key bytes
 of the header key
    ValueLengthKey => data VarInt(utf8) <------------------------------- NEW UTF8 encoded sizestring ofas thedata (byte[])
 of the serialised header value
    Value => bytesdata <------------------------------------ NEW serialisedheader formvalue ofas the header value
 data (byte[])

Compatibility, Deprecation, and Migration Plan

  • Current client users should not be affected, this is new api methods being added to client apis
  • Message version allows clients expecting older message version, would not be impacted 
    • older producers simply would produce a message without headers
      • new consumer would simply get a message with empty headers
    • older consumers simply would consume a message oblivious to there being any headers
  • Message version migration would be handled as like in KIP-32

...