Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Add a new headers length and value (byte[]) to the core message format.
  2. Add a headers (Map<int, byte[]>) field to ProducerRecord and ConsumerRecord. 

  3. Add accessor methods on the Producer/ConsumerRecord void setHeader(int, byte[]) and a byte[] getHeader(int)
  4. Add ProduceRequest/ProduceResponse V4 which uses the new message format.
  5. Add FetchRequest/FetchResponse V4 which uses the new message format.
  6. The serialisation of the [int, bye[]] header set will on the wire using a strict format
  7. Each headers value will be custom serialisable by the interceptors/plugins that use the header.As int key based headers for compactness ranges will be reserved for different usages:client headers adopted by the kafka open source (not necessary the kafka open source owns it)
  8. broker headers adopted by the kafka open source (not necessary the kafka open source owns it)
  9. commercial vendor space
  10. custom inhouse

For more detail information of the above changes, please refer to the Proposed Changes section.

...

Wire protocol of the headers bytes (if above mentioned attributes bit flag is true)

The below is for the headers wire protocol.

...

  • Allows faster access to lower numbered keys which can be reserved for server side set/accessed headers
  • Ability to skip/not read higher numbered keys used by clients.

flag is true)

The below is for the headers wire protocol.

...

 

Code Block
languagejava
Headers (bytes) => Set(Key, ValueLength, Value)
  Set =>
	Key => byte [] => Typed <---------------------- NEW int key of the header
    Value => byte[] => Typed <-------------------- NEW serialised form of the header value

Typed (bytes) => (Type, Value)
	Type => byte <---------------------- 0x00 = null
										 0x01 = boolean=true
                                         0x02 = boolean=false
                                         0x03 = byte
                                         0x04 = char
                                         0x05 = short (int16)
                                         0x06 = int (int32)
                                         0x07 = long (int64)
                                         0x08 = float
                                         0x09 = double
                                         0x0A = string
                                         0x0B = byte[]
    BytesValue => bytes <-------------------- NEW byte array holding the corresponding value in byte array form, 
										 0x00-0x002 - null and boolean this will be zero length
                                         0x03-0x09 - type length, will be the length of the byte array
                                         0x0A-0x0B - string and byte this will contain the leading int32 length of the byte array.
 
 

...