Status

Current state:  Adopted

Discussion threadhere 

JIRAKAFKA-4208

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP tries to address the following issues in Kafka.

In most message systems (JMS, QPID etc), streaming systems and most transport systems(HTTP, TCP), it is typical to have a concept of headers and payload.

The payload is traditionally for the business object, and headers are traditionally used for transport routing, filtering etc. Headers are most typically key=value pairs.

In its current state Kafka does not support the ability to have headers natively in its message/record format.

Examples where having separate supported custom headers becomes useful (this is not an exhaustive list).

Kafka currently has Record<K, V> structure which originally could be used to follow this semantic where by K could contain the headers information, and the V could be the payload.

This issue has been flagged by many people over the past period in forums.

Further details and a more detailed case for headers can be seen here : A Case for Kafka Headers

Amendments made during implementation, and on KIP-118 being pulled are highlighted orange, changes reviewed during PR and notification sent to dev mailing lists.

Public Interfaces

This KIP has the following public interface changes:

  1. Add a new headers length and value (byte[]) to the core message format.
  2. Create a Header Interface and implementing class
    1. Interface

      public interface Header {
            
         String key();
      
         byte[] value();
      }
      
      
      
    2. Implementation Detail

      1. Add a String key field to Header implementing class
      2. Add a byte[] value field to Header implementing class

  3. Create a Headers Interface and implementing class 
    1. Headers will be mutable
      1. For the Producer, after send and post interceptors it will be turned into a read only immutable instance.
      2. This will be done by the invoking "close()" method, this method is not exposed in the api, but an implementation detail.
    2. Interface

      public interface Headers extends Iterable<Header> {
          
         /**
          *  Adds a header (key inside), returning if the operation succeeded.
          *  If headers is in read-only, will always fail the operation with throwing IllegalStateException.
          */
         Headers add(Header header) throws IllegalStateException;
       
         /**
          *  Adds a header by key and value, returning if the operation succeeded.
          *  If headers is in read-only, will always fail the operation with throwing IllegalStateException.
          */
         Headers add(String key, byte[] value) throws IllegalStateException;
       
         /**
          *  Removes ALL HEADERS for the given key returning if the operation succeeded.
          *  If headers is in read-only, will always fail the operation with throwing IllegalStateException.
          */
         Headers remove(String key) throws IllegalStateException;
      
         /**
          *  Returns JUST ONE (the very last) header for the given key, if present.
          */
         Header lastHeader(String key)
          
         /**
          *  Returns ALL headers for the given key, if present.
          */
         Iterable<Header> headers(String key);
       
      }
      
      
    3. Implementation Detail

      1. Add accessor methods on the Headers class 

        1. Headers add(Header header)
        2. Headers add(String key, byte[] value)
        3. Headers remove(Header header)
        4. Header lastHeader(String key)
        5. Iterable<Header> headers(String key)
          1. Inline with ConsumerRecords interface returning the subset of ConsumerRecord for a given topic.
            1. public Iterable<ConsumerRecord<K, V>> records(String topic)
        6. implement Iterable<Header>
      2. interceptors and k,v serialisers are expected to add headers during the produce intercept stage.
  4. Add a headers field to ProducerRecord and ConsumerRecord. 
  5. Add constructor(s) of Producer/ConsumerRecord to allow passing in of Iterable<Header> 
    1. use case is MirrorMakers able to copy headers.
  6. Add accessor methods on the Producer/ConsumerRecord Headers headers()
    1. public class ProducerRecord<K, V> {
           
         ...
         
         ProducerRecord(K key, V value, Iterable<Header> headers, ...)
         
         ...
         
         public Headers headers();
      
         ...
         
      }
       
      public class ConsumerRecord<K, V> {
         
         ...
         ConsumerRecord(K key, V value, Iterable<Header> headers, ...)
      
         ...
         
         public Headers headers();
         
         ...
         
      }
       
  7. Changes needed, will piggyback onto V3 of ProduceRequest and V5 of FetchRequest which were introduced in KIP-98
  8. The serialisation of the [String, byte[]] header array will on the wire using a strict format
  9. Each headers value will be custom serialisable by the interceptors/plugins/serdes that use the header.
  10. Expose headers to De/Serializers - extended interface added, for lack of default methods available in java 8

    public interface ExtendedDeserializer<T> extends Deserializer<T> {
        T deserialize(String topic, Headers headers, byte[] data);
    }
    public interface ExtendedSerializer<T> extends Serializer<T> {
        byte[] serialize(String topic, Headers headers, T data);
    }

For more detail information of the above changes, please refer to the Proposed Changes section.

Proposed Changes

There are four options proposed before this proposal. This details our proposed solution of Option 1 described here. The other options are in the Rejected Alternatives section.

 

The advantages of this proposal are:

The disadvantage of this proposal is:

 

Key duplication and ordering

Create a Headers Interface and Implementation to encapsulate headers protocol.

Add a headers field Headers to both ProducerRecord and ConsumerRecord

 

Add new method to make headers accessible during de/serialization

Wire protocol change - add array of headers to end of the message format

The below is for the core Message wire protocol change needed to fit headers into the message.

This is basing off KIP-98 Message protocol proposals.

 

Message =>
        Length => varint
        Attributes => int8
        TimestampDelta => varlong
        OffsetDelta => varint
        KeyLen => varint
        Key => data
        ValueLen => varint 
        Value => data
        Headers => [Header] <------------ NEW Added Array of headers
        
Header =>
		Key => string (utf8) <------------------------------- NEW UTF8 encoded string (uses varint length)
        Value => bytes  <------------------------------------ NEW header value as data (uses varint length)

Compatibility, Deprecation, and Migration Plan

Out of Scope

Some additional features/benefits were noticed and discussed on the above but are deemed out of scope and should be tackled by further KIPS.

Rejected Alternatives

Map<Int, byte[]> Headers added to the Producer/ConsumerRecord

The concept is similar to the above proposed but int keys

Map<String, String> Headers added to the ConsumerRecord

The concept is similar to the above proposed but with a few more disadvantages.

ProducerRecord<K, H, V>, ConsumerRecord<K, H, V>

The proposed change is that headers are Map<int, byte[]> only, this alternative is that headers can be of any type denoted by H

Common Value Message Wrapper - Message<V>

This builds on the status quo and addresses some core issues, but fails to address some more advanced and future use cases and also has some compatibility issues for upgrade/clients not supporting.

please see: Headers Value Message Wrapper

 

Status Quo - Keep Custom Value Message Wrapper - Message<H, P>

This concept is the current defacto way many users are having to temporally deal with the situation, but has some core key issues that it does not resolve.