Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add note on key dup and ordering

...

Table of Contents

Status

Current state:  Under Discussion Adopted

Discussion threadhere 

JIRAKAFKA-4208

...

Further details and a more detailed case for headers can be seen here : A Case for Kafka Headers

Amendments made during implementation, and on KIP-118 being pulled are highlighted orange, changes reviewed during PR and notification sent to dev mailing lists.

Public Interfaces

This KIP has the following public interface changes:

  1. Add a new headers length and value (byte[]) to the core message format.
  2. Create a Header Interface and implementing class
    1. Interface

      Code Block
      public interface Header {
            
         String key();
      
         byte[] value();
      }
      
      
      
    2. Implementation Detail

      1. Add a String key field to Header implementing class
      2. Add a byte[] value field to Header implementing class

  3. Create a Headers Interface and implementing class 
    1. Headers will be mutable
      1. For the Producer, after send and post interceptors it will be turned into a read only immutable instance.
      2. This will be done by the invoking "close()" method, this method is not exposed in the api, but an implementation detail.
    2. Interface

      Code Block
      public interface Headers extends Iterable<Header> {
          
         /**
          *  Adds a header (key inside), returning if the operation succeeded.
          *  If headers is in read-only, will always fail the operation with throwing IllegalStateException.
          */
         Headers add(Header header) throws IllegalStateException;
       
         /**
          *  Adds a header by key and value, returning if the operation succeeded.
          *  If headers
    (Map<String, byte[]>) field to ProducerRecord and ConsumerRecord. 
  4. Add accessor methods on the Producer/ConsumerRecord void putHeader(String, byte[]) and a Collection<byte[]> getHeader(String)
  5. Add ProduceRequest/ProduceResponse V4 which uses the new message format.
    1.  is in read-only, will always fail the operation with throwing IllegalStateException.
          */
         Headers add(String key, byte[] value) throws IllegalStateException;
       
         /**
          *  Removes ALL HEADERS for the given key returning if the operation succeeded.
          *  If headers is in read-only, will always fail the operation with throwing IllegalStateException.
          */
         Headers remove(String key) throws IllegalStateException;
      
         /**
          *  Returns JUST ONE (the very last) header for the given key, if present.
          */
         Header lastHeader(String key)
          
         /**
          *  Returns ALL headers for the given key, if present.
          */
         Iterable<Header> headers(String key);
       
      }
      
      
    2. Implementation Detail

      1. Add accessor methods on the Headers class 

        1. Headers add(Header header)
        2. Headers add(String key, byte[] value)
        3. Headers remove(Header header)
        4. Header lastHeader(String key)
        5. Iterable<Header> headers(String key)
          1. Inline with ConsumerRecords interface returning the subset of ConsumerRecord for a given topic.
            1. public Iterable<ConsumerRecord<K, V>> records(String topic)
        6. implement Iterable<Header>
      2. interceptors and k,v serialisers are expected to add headers during the produce intercept stage.
  6. Add a headers field to ProducerRecord and ConsumerRecord. 
  7. Add constructor(s) of Producer/ConsumerRecord to allow passing in of Iterable<Header> 
    1. use case is MirrorMakers able to copy headers.
  8. Add accessor methods on the Producer/ConsumerRecord Headers headers()
    1. Code Block
      public class ProducerRecord<K, V> {
           
         ...
         
         ProducerRecord(K key, V value, Iterable<Header> headers, ...)
         
         ...
         
         public Headers headers();
      
         ...
         
      }
       
      Code Block
      public class ConsumerRecord<K, V> {
         
         ...
         ConsumerRecord(K key, V value, Iterable<Header> headers, ...)
      
         ...
         
         public Headers headers();
         
         ...
         
      }
       
  9. Changes needed, will piggyback onto V3 of ProduceRequest and V5 of FetchRequest which were introduced in KIP-98
    Add FetchRequest/FetchResponse V4 which uses the new message format.
  10. The serialisation of the [String, byte[]] header array will on the wire using a strict format
  11. Each headers value will be custom serialisable by the interceptors/plugins/serdes that use the header.As int key based headers for compactness ranges will be reserved for different usages:
  12. client headers adopted by the kafka open source (not necessary the kafka open source owns it)
  13. broker headers adopted by the kafka open source (not necessary the kafka open source owns it)
  14. commercial vendor space
  15. Expose headers to De/Serializers - extended interface added, for lack of default methods available in java 8

    Code Block
    public interface ExtendedDeserializer<T> extends Deserializer<T> {
        T deserialize(String topic, Headers headers, byte[] data);
    }
    Code Block
    public interface ExtendedSerializer<T> extends Serializer<T> {
        byte[] serialize(String topic, Headers headers, T data);
    }
    custom inhouse

For more detail information of the above changes, please refer to the Proposed Changes section.

...

  • Adds the ability for producers to set standard header key=value value pairs
  • No incompatible client api change (only new methods)
  • Allows users to specify the serialisation of the header value per headerCompact key space
  • Provides a standardised interface to eco systems of tools that then can grow around the feature

The disadvantage of this proposal is:

  • Change to the message object

 

Key duplication and ordering

  • Duplicate headers with the same key must be supported.
  • The order of headers must be retained throughout a record's end-to-end lifetime: from producer to consumer.

Create a Headers Interface and Implementation to encapsulate headers protocol.

  • See above public interfaces section for sample interfaces.

Add a headers field

MultiMap<String, byte[]>

Headers to both ProducerRecord and ConsumerRecord

  • Accessor methods of void putHeader(String, byte[]) and Collection<byte[]> getHeader(StringHeaders headers() added to interface of the ProducerRecord/ConsumerRecord.

...

  • Add constructor(s) of Producer/ConsumerRecord to allow passing in of an existing/pre-constructed headers via Iterable<Header> 
    1. use case is MirrorMakers able to copy headers.

 

Add new method to make headers accessible during de/serialization

  • Add new default method to Serialization/Deserialzation class, with Header parameter
    • Due to KIP-118 not being implemented, a new extended interface ExtendedSerializer ExtendedDeserializer with new extra methods is introduced instead
      • Existing De/Serialization will be wrapped and work as before.

Wire protocol change - add array of headers to end of the message format

The below is for the core Message wire protocol change needed to fit headers into the message.

  • A header key can occur multiple times, clients should expect to handle this, this

...

  • can be represented as a multi map, or an array of values per key in clients.

This is basing off KIP-98 Message protocol proposals.

 

Code Block
languagejava
MessageAndOffsetMessage => Offset MessageSize Message
  Offset => int64  
  MessageSizeLength => int32varint
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value
=> int8
        CrcTimestampDelta => int32varlong
    MagicByte => int8  <---------------------- Bump up magic byte to 2
OffsetDelta => varint
        AttributesKeyLen => int8 <---------------------- Use Bit 4 as boolean flag for if headers presentvarint
        Key => data
    Timestamp => int64
    KeyLengthValueLen => varint int32
    Key => bytes
    (optional) HeadersLengthValue => int32 <--------------- NEW [optional] size of the byte[] of the serialised headers if headers
data
       (optional) Headers => bytes[Header] <--------------------- NEW [optional] serialised formAdded Array of the headers Map<int, byte[]>

     ValueLength => int32
    Value 
Header => bytes

Wire protocol of the headers bytes (if above mentioned attributes bit flag is true)

The below is for the headers wire protocol.

...

Code Block
languagejava
Headers (bytes) => Array(KeyLength, Key, ValueLength, Value)
  Set =>
	KeyLength => int32
		Key => string (utf8) <-----------------NEW size of the byte[] of the serialised key value
	Key => bytes <---------------------- NEW UTF8 serialisedencoded string (UTF8) bytes of the header key
uses varint length)
        ValueLengthValue => bytes int32 <---------------- NEW size of the byte[] of the serialised header value
    Value => bytes <-------------------- NEW header serialisedvalue formas ofdata the(uses headervarint valuelength)

 

Compatibility, Deprecation, and Migration Plan

  • Current client users should not be affected, this is new api methods being added to client apis
  • Message version allows clients expecting older message version, would not be impacted 
    • older producers simply would produce a message which doesn't support headers
      • upcasting would result in a message without headers
      • new consumer would simply get a message with empty headersand as no headers on send would have none.
    • older consumers simply would consume a message oblivious to there being they would not be aware of any headers
      • down casting would remove headers from the message as older message format doesn't have these in protocol.
  • Message version migration would be handled as like in KIP-32
  • Given the mutability of the headers and the fact that we close them at some point, it means that users should not resend the same record if they use headers and interceptors.
    • They should either configure the producer so that automatic retries are attempted (preferred although there are some known limitations regarding records that are timed out before a send isactually attempted)
    • or 
    • They need to create a new record while being careful about what headers they include (if they rely fully on interceptors for headers, they could just not include any headers in the new headers).

Out of Scope

Some additional features/benefits were noticed and discussed on the above but are deemed out of scope and should be tackled by further KIPS.

...