Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Add a new headers length and value (byte[]) to the core message format.
  2. Add a headers (Map<int, byte[]>) field to ProducerRecord and ConsumerRecord. A producer/interceptors will be able to set headers on a ProducerRecord. A consumer/interceptors will see the message headers (immutable) when it sees the messages. 

  3. Add accessor methods on the Producer/ConsumerRecord void setHeader(int, byte[]) and a byte[] getHeader(int)
  4. Add ProduceRequest/ProduceResponse V3 which uses the new message format.
  5. Add FetchRequest/FetchResponse V3 which uses the new message format.
  6. The serialisation of the [int, bye[]] header set will on the wire using a strict format
  7. Each headers value will be custom serialisable by the interceptors/plugins that use the header.
  8. As int key based headers for compactness ranges will be reserved for different usages:
    1. client headers adopted by the kafka open source (not necessary the kafka open source owns it)
    2. broker headers adopted by the kafka open source (not necessary the kafka open source owns it)
    3. commercial vendor space
    4. custom inhouse

...

  • Adds the ability for producers to set standard header key=value value pairs
  • No incompatible client api change (only new methods)
  • Allows users to specify the serialisation of the header value per header
  • Compact key space
  • Provides a standardised interface to eco systems of tools that then can grow around the feature

The disadvantage of this proposal is:

  • Change to the message object

Add a headers field Map<int, byte[]> to both ProducerRecord and ConsumerRecord

  • Accessor methods of void setHeader(int, byte[]) and byte[] getHeader(int) added to interface of the ProducerRecord.Accessor methods of byte[] getHeader(int) added to interface of the ConsumerRecord. (immutable as such no set required)/ConsumerRecord.

Wire protocol change - use attribute bit4 as flag for if headers present. and add (optional) header size and field to the message format

...

The below is for the headers wire protocol.

  • The [Discuss] should headers to be ordered by the int key (ascending)
    or not
    • [Proposed] current proposal is to not enforce ordering as headers should be composable and should be in any order, and benefits are offset by the disadvantages. 
    • Benefits Benefits are more prevalent on the server side
      • Allows faster access to lower numbered keys which can be reserved for server side set/accessed headers
      • Ability to skip/not read higher numbered keys used by clients.Examples of such server side cases that would benefit:
      • clusterId tagging the message since KIP-78 introduced clusterId this would be useful in message tracing and routing.
      • Server side auditing (http://www.slideshare.net/JonBringhurst/kafka-audit-kafka-meetup-january-27th-2015)
    • Disadvantage 
      • Ability to add a header to existing array is more complex, as cannot simply append
      • Makes advanced uses such as repeated headers would be limited by ordering

 

Code Block
languagejava
Headers (bytes) => Set(Key, ValueLength, Value)
  Set =>
	Key => int32 <---------------------- NEW int key of the header
    ValueLength => int32 <-------------- NEW size of the byte[] of the serialised header value
    Value => bytes <-------------------- NEW serialised form of the header value
 

...