Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add note on key dup and ordering

 

Table of Contents

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state:  Under Discussion [One of "Under Discussion", "Accepted", "Rejected"]Adopted

Discussion threadhere [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]4208

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP tries to address the following issues in Kafka.

...

The payload is traditionally for the business object, and headers are traditionally used for transport routing, filtering etc. Headers are most typically key=value pairs of strings.

In its current state Kafka does not support the ability to have headers natively in its message/record format.

...

This issue has been flagged by many people over the past period in forums.

Further details and a more detailed case for headers can be seen here : A Case for Kafka Headers

Amendments made during implementation, and on KIP-118 being pulled are highlighted orange, changes reviewed during PR and notification sent to dev mailing lists.

Public Interfaces

This KIP has the following public interface changes:

  1. Add a new headers length and value (byte[]) to the core message format.
  2. Add a headers (Map<String, String>) field to ProducerRecord and ConsumerRecord. A producer will be able to set headers on a ProducerRecord. A consumer will see the message headers when it sees the messages.

  3. Add accessor methods on the Producer/ConsumerRecord void setHeader(String, String) and a String getHeader(String)
  4. Add ProduceRequest/ProduceResponse V3 which uses the new message format.
  5. Add FetchRequest/FetchResponse V3 which uses the new message format.
  6. Add a configurable headers serializer Serializer<Map<String, String> to ProducerRecord
  7. Create a Header Interface and implementing class
    1. Interface

      Code Block
      public interface Header {
            
         String key();
      
         byte[] value();
      }
      
      
      
    2. Implementation Detail

      1. Add a String key field to Header implementing class
      2. Add a byte[] value field to Header implementing class

  8. Create a Headers Interface and implementing class 
    1. Headers will be mutable
      1. For the Producer, after send and post interceptors it will be turned into a read only immutable instance.
      2. This will be done by the invoking "close()" method, this method is not exposed in the api, but an implementation detail.
    2. Interface

      Code Block
      public interface Headers extends Iterable<Header> {
          
         /**
          *  Adds a header (key inside), returning if the operation succeeded.
          *  If headers is in read-only, will always fail the operation with throwing IllegalStateException.
          */
         Headers add(Header header) throws IllegalStateException;
       
         /**
          *  Adds a header by key and value, returning if the operation succeeded.
          *  If headers is in read-only, will always fail the operation with throwing IllegalStateException.
          */
         Headers add(String key, byte[] value) throws IllegalStateException;
       
         /**
          *  Removes ALL HEADERS for the given key returning if the operation succeeded.
          *  If headers is in read-only, will always fail the operation with throwing IllegalStateException.
          */
         Headers remove(String key) throws IllegalStateException;
      
         /**
          *  Returns JUST ONE (the very last) header for the given key, if present.
          */
         Header lastHeader(String key)
          
         /**
          *  Returns ALL headers for the given key, if present.
          */
         Iterable<Header> headers(String key);
       
      }
      
      
    3. Implementation Detail

      1. Add accessor methods on the Headers class 

        1. Headers add(Header header)
        2. Headers add(String key, byte[] value)
        3. Headers remove(Header header)
        4. Header lastHeader(String key)
        5. Iterable<Header> headers(String key)
          1. Inline with ConsumerRecords interface returning the subset of ConsumerRecord for a given topic.
            1. public Iterable<ConsumerRecord<K, V>> records(String topic)
        6. implement Iterable<Header>
      2. interceptors and k,v serialisers are expected to add headers during the produce intercept stage.
  9. Add a headers field to ProducerRecord and ConsumerRecord. 
  10. Add constructor(s) of Producer/ConsumerRecord to allow passing in of Iterable<Header> 
    1. use case is MirrorMakers able to copy headers.
  11. Add accessor methods on the Producer/ConsumerRecord Headers headers()
    1. Code Block
      public class ProducerRecord<K, V> {
           
         ...
         
         ProducerRecord(K key, V value, Iterable<Header> headers, ...)
         
         ...
         
         public Headers headers();
      
         ...
         
      }
       
      Code Block
      public class ConsumerRecord<K, V> {
         
         ...
         ConsumerRecord(K key, V value, Iterable<Header> headers, ...)
      
         ...
         
         public Headers headers();
         
         ...
         
      }
       
  12. Changes needed, will piggyback onto V3 of ProduceRequest and V5 of FetchRequest which were introduced in KIP-98
  13. The serialisation of the [String, byte[]] header array will on the wire using a strict format
  14. Each headers value will be custom serialisable by the interceptors/plugins/serdes that use the header.
  15. Expose headers to De/Serializers - extended interface added, for lack of default methods available in java 8

    Code Block
    public interface ExtendedDeserializer<T> extends Deserializer<T> {
        T deserialize(String topic, Headers headers, byte[] data);
    }
    Code Block
    public interface ExtendedSerializer<T> extends Serializer<T> {
        byte[] serialize(String topic, Headers headers, T data);
    }
    Add a configurable headers deserializer Deserializer<Map<String, String> HeadersDeserializer to ConsumerRecord

For more detail information of the above changes, please refer to the Proposed Changes section.

Proposed Changes

...

There are three four options proposed before this proposal. This details our proposed solution of Option 1 described here. The other options Option 2 and Option 3 are in the Rejected Alternatives section.

 

The advantages of this proposal are:
  • Adds the ability for producers to set standard header key=value string value pairs
  • No incompatible client api change (only new methods)
  • Allows users to specify the serialization serialisation of the key=value map (String(&=), JSON, AVRO).header value per header
  • Provides a standardized standardised interface to eco systems of tools that then can grow around the feature

The disadvantage of this proposal is:

  • 4 bytes overhead if no headers / feature is not used without headers flag byte.
    • This could be mitigated by using message magic byte = 1 and using the version 1 message for those not using the feature or where headers are null.
  • 1 byte overhead if not headers / feature is not used with headers flag byte but extra byte overhead when using headers.
  • Change to the message object

...

  • Change to the message object

 

Key duplication and ordering

  • Duplicate headers with the same key must be supported.
  • The order of headers must be retained throughout a record's end-to-end lifetime: from producer to consumer.

Create a Headers Interface and Implementation to encapsulate headers protocol.

  • See above public interfaces section for sample interfaces.

Add a headers field Headers to both ProducerRecord and ConsumerRecord

  • Accessor methods of Headers headers() added to interface of the ProducerRecord/ConsumerRecord.
  • Add constructor(s) of Producer/ConsumerRecord to allow passing in of an existing/pre-constructed headers via Iterable<Header> 
    1. use case is MirrorMakers able to copy headers.

 

Add new method to make headers accessible during de/serialization

  • Add new default method to Serialization/Deserialzation class, with Header parameter
    • Due to KIP-118 not being implemented, a new extended interface ExtendedSerializer ExtendedDeserializer with new extra methods is introduced instead
      • Existing De/Serialization will be wrapped and work as before.

Wire protocol change - add array of headers to end of the message format

The below is for the core Message wire protocol change needed to fit headers into the message.

  • A header key can occur multiple times, clients should expect to handle this, this can be represented as a multi map, or an array of values per key in clients.

This is basing off KIP-98 Message protocol proposals.

 

Code Block
languagejava
MessageAndOffsetMessage => Offset MessageSize Message
  Offset => int64  
  MessageSizeLength => int32varint
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32int8
    MagicByte => int8  <---------------------- Bump up magic byte to 2
    Attributes TimestampDelta => int8varlong
    Timestamp => int64
    KeyLengthOffsetDelta => int32varint
    Key => bytes
   KeyLen HeadersLength => int32 <------------------ NEW length of the byte[] of the serialized headers (int32 = 0 if no headers)varint
    Headers => bytes <------------------------ NEW serialized form of the headers Map<String, String> (if old message or no headers this can empty)
    ValueLength => int32
    Value => bytes

 

Wire protocol change - add a header field and size to the message format (with headers flag)

Code Block
languagejava
MessageAndOffset => Offset MessageSize Message
  Offset => int64  
  MessageSize => int32
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8   Key => data
        ValueLen => varint 
        Value => data
        Headers => [Header] <---------------------- Bump up magic byte to 2 NEW Added Array of headers
    Attributes => int8
    Timestamp => int64
    KeyLength => int32
      
Header =>
		Key => bytes
string    HeadersFlag => byte (utf8) <--------------------- NEW
    (optional) HeadersLength => int32 <------------------ NEW lengthUTF8 ofencoded the byte[] of the serialized headers (int32 = 0 if no headers)
string (uses varint length)
       (optional) HeadersValue => bytes  <------------------------------------ NEW serializedheader formvalue ofas the headers Map<String, String> (if old message or no headers this can empty)
    ValueLength => int32
    Value => bytes

 

Add a headers field Map<String, String> to both ProducerRecord and ConsumerRecord

  • Accessor methods of void setHeader(String, String) and String getHeader(String) added to interface of both.

Add configurable Deserializer and Serializer to KafkaConsumer and KafkaProducer

  • KafkaProducer headers.serializer denotes the serializer to use
    • Producer uses the headers.serializer to convert the headers field (Map<String, String>) in the ProducerRecord to byte[]
  • KafkaConsumer headers.deserializer denotes the deserializer to use
    • Fetcher is updated to user the headers.deserialzier to convert byte[] to the headers field (Map<String, String>) set in the ConsumerRecord

Create a simple String UTF8 based Serializer/Deserializer implementation for headers.serializer/headers.deserialzier and default config to use this.

 

  • This would work by converting map into a string in form of key1=value1&key2=value2&key3=value3 (like query params, url encoding the key and values) and then getBytes("UTF-8") to get the byte[]

 

 

data (uses varint length)

Compatibility, Deprecation, and Migration Plan

  • Current client users should not be affected, this is new api methods being added to client apis
  • Message version allows clients expecting older message version, would not be impacted 
    • older producers simply would produce a message which doesn't support headers
      • upcasting would result in a message without headers
      • new consumer would simply get a message with empty headersand as no headers on send would have none.
    • older consumers simply would consume a message oblivious to there being they would not be aware of any headers
      • down casting would remove headers from the message as older message format doesn't have these in protocol.
  • Message version migration would be handled as like in KIP-32

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

Value Message Wrapper - Message<H, P>

This concept is the current defacto way many users are having to temporally deal with the situation, but has some core key issues that it does not resolve.

  • Given the mutability of the headers and the fact that we close them at some point, it means that users should not resend the same record if they use headers and interceptors.
    • They should either configure the producer so that automatic retries are attempted (preferred although there are some known limitations regarding records that are timed out before a send isactually attempted)
    • or 
    • They need to create a new record while being careful about what headers they include (if they rely fully on interceptors for headers, they could just not include any headers in the new headers).

Out of Scope

Some additional features/benefits were noticed and discussed on the above but are deemed out of scope and should be tackled by further KIPS.

  • Core message size reduction
    • remove overhead of 4 bytes for KeyLength when no headers using attributes bit
    • reduce overhead of 4 bytes for KeyLength by using variable length encoded int
    • reduce overhead of 4 bytes for ValueLength by using variable length encoded int
  • Broker side interceptors
    • with headers we could start introducing broker side message interceptors to append meta data or handle messages
  • Single Record consumer API
    • There is many uses cases where single record consumer/listener api is more user friendly - this is evident by the fact spring kafka have already created a wrapper, it would be good to support this natively.

Rejected Alternatives

Map<Int, byte[]> Headers added to the Producer/ConsumerRecord

The concept is similar to the above proposed but int keys

  • Benefits
    • more compact much reduced byte size overhead only 4 bytes.
    • String keys can dwarf the value in byte size.
  • Disadvantages
    • String keys are more common in many systems
    • Requires management of the int key space, where as string keys have natural key space management.
Map<String, String> Headers added to the ConsumerRecord

The concept is similar to the above proposed but with a few more disadvantages.

  • Benefits
    • Adds the ability for producers to set standard header key=value string value pairs
    • No incompatible client api change (only new methods)
    • Allows users to specify the serialisation of the key=value map (String(&=), JSON, AVRO).
    • Provides a standardised interface to eco systems of tools can grow around the feature
  • Disadvantages
    • Change to the message object
    • String key cause a large key, this can cause a message payload thats of 60bytes to be dwarfed by its headers
    • String value again doesn't allow for compact values, and restricts that a value must be a String
    • Not able to use headers on the broker side with custom serialisation

...

  • This will cause no broker side changes, and or message format changes

...

ProducerRecord<K, H, V>, ConsumerRecord<K, H, V>

The proposed change is that headers are Map<String, String> Map<int, byte[]> only, this alternative is that headers can be of any type denoted by H

  • Benefits
    • Complete customization customisation of what a header is.
  • Disadvatages
    • As generics don't allow for default type, this would cause breaking client interface compatibility if done on Producer/ConsumerRecord.
      • Possible work-around would be to have HeadersProducer/ConsumerRecord<K, H, V> that then Producer/ConsumerRecord extend where H is object, this though becomes ugly fast if kept for a time or would require a deprecation / refactor v2 period.
Common Value Message Wrapper - Message<V>

This builds on the status quo and addresses some core issues, but fails to address some more advanced and future use cases and also has some compatibility issues for upgrade/clients not supporting.

please see: Headers Value Message Wrapper

 

Status Quo - Keep Custom Value Message Wrapper - Message<H, P>

This concept is the current defacto way many users are having to temporally deal with the situation, but has some core key issues that it does not resolve.

  • Benefits
    • This will cause no broker side changes to handle the message
  • Disadvantages
    • This would not work with compaction where headers are needed to be sent on delete record which then would not deliver on many of the requirements.
    • Every organization has custom solution
      • No ecosystem of tooling can evolve
      • Cost of Third party vendors integration high, as custom for every organization they integrate for.
    • Not able to make use of headers server side
    • Couple Serialisation and Deserialisation of the value for both the header and payload.