Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Add a new headers length and value (byte[]) to the core message format.
  2. Create a Header Interface and implementing class
    1. Interface

      Code Block
      public interface Header {
            
         String key();
      
         byte[] value();
      }
      
      
      
    2. Implementation Detail

      1. Add a String key field to Header implementing class
      2. Add a byte[] value field to Header implementing class

  3. Create a Headers Interface and implementing class 
    1. Headers will be mutable
      1. For the Producer, after send and post interceptors it will be turned into a read only immutable instance.
      2. This will be done by the invoking "close()" method, this method is not exposed in the api, but an implementation detail.
    2. Interface

      Code Block
      public interface Headers extends Iterable<Header> {
          
         /**
          *  Adds a header (key inside), returning if the operation succeeded.
          *  If headers is in read-only, will always fail the operation with throwing IllegalStateException.
          */
         Headers add(Header header) throws IllegalStateException;
       
         /**
          *  Adds a header by key and value, returning if the operation succeeded.
          *  If headers is in read-only, will always fail the operation with throwing IllegalStateException.
          */
         Headers add(String key, byte[] value) throws IllegalStateException;
       
         /**
          *  Removes ALL HEADERS for the given key returning if the operation succeeded.
          *  If headers is in read-only, will always fail the operation with throwing IllegalStateException.
          */
         Headers remove(String key) throws IllegalStateException;
      
         /**
          *  Returns JUST ONE (the very last) header for the given key, if present.
          */
         Header lastHeader(String key)
          
         /**
          *  Returns ALL headers for the given key, if present.
          */
         Iterable<Header> headers(String key);
       
      }
      
      
    3. Implementation Detail

      1. Add accessor methods on the Headers class 

        1. Headers add(Header header)
        2. Headers add(String key, byte[] value)
        3. Headers remove(Header header)
        4. Header lastHeader(String key)
        5. Iterable<Header> headers(String key)
          1. Inline with ConsumerRecords interface returning the subset of ConsumerRecord for a given topic.
            1. public Iterable<ConsumerRecord<K, V>> records(String topic)
        6. implement Iterable<Header>
      2. interceptors and k,v serialisers are expected to add headers during the produce intercept stage.
  4. Add a headers field to ProducerRecord and ConsumerRecord. 
  5. Add constructor(s) of Producer/ConsumerRecord to allow passing in of Iterable<Header> 
    1. use case is MirrorMakers able to copy headers.
  6. Add accessor methods on the Producer/ConsumerRecord Headers headers()
    1. Code Block
      public class ProducerRecord<K, V> {
           
         ...
         
         ProducerRecord(K key, V value, Iterable<Header> headers, ...)
         
         ...
         
         public Headers headers();
      
         ...
         
      }
       
      Code Block
      public class ConsumerRecord<K, V> {
         
         ...
         ConsumerRecord(K key, V value, Iterable<Header> headers, ...)
      
         ...
         
         public Headers headers();
         
         ...
         
      }
       
  7. Changes needed, will piggyback onto V3 of ProduceRequest and V4 of FetchRequest which were introduced in KIP-98
  8. The serialisation of the [String, byte[]] header array will on the wire using a strict format
  9. Each headers value will be custom serialisable by the interceptors/plugins/serdes that use the header.
  10. Expose headers to De/Serializers - New extended interface added, for lack of default methods available in java 8 

    Code Block
    public interface ExtendedDeserializer<T> extends Deserializer<T> {
        T deserialize(String topic, Headers headers, byte[] data);
    }
    Code Block
    public interface ExtendedSerializer<T> extends Serializer<T> {
        byte[] serialize(String topic, Headers headers, T data);
    }

...