Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Add a new headers length and value (byte[]) to the core message format.
  2. Create a Header Interface and implementing class
    1. Interface

      Code Block
      public interface Header {
         String key();
      
         byte[] value();
      }
      
      
      
    2. Implementation Detail

      1. Add a String key field to Header implementing class
      2. Add a byte[] value field to Header implementing class

  3. Create a Headers Interface and implementing class 
    1. Interface

      Code Block
      public interface Headers extends Iterable<Header> {
         
         Headers append(Iterable<Header> headers);
       
         Headers append(Header header);
      
         Headers remove(Iterable<Header> headers);
       
         Headers remove(Header header);
         /**
          *  Get just the headers for the given key
          */
         Iterable<Header> headers(String key);
       
         /**
          * Turns the mutable instance into read only immutable instance.
          * this is invoked on send after the interceptors.
          */
         Headers close();
       
      }
      
      
    2. Implementation Detail

      1. Add accessor methods on the Headers class 

        1. Headers append(Iterable<Header> headers)
        2. Headers append(Header header)
        3. Headers remove(Iterable<Header> headers)
        4. Headers remove(Header header)
        5. Iterable<Header> headers(String key)
          1. Inline with ConsumerRecords interface returning the subset of ConsumerRecord for a given topic.
            1. public Iterable<ConsumerRecord<K, V>> records(String topic)
        6. Mutation methods will return a new instance. *** under discussion ***
        7. close method will turn headers object from mutable into immutable *** under discussion ***
      2. implement Iterable<Header>
      3. interceptors are expected to add headers during the produce intercept stage.
  4. Add a headers field to ProducerRecord and ConsumerRecord. 
  5. Add constructor(s) of Producer/ConsumerRecord to allow passing in of Iterable<Header> 
    1. use case is MirrorMakers able to copy headers.
  6. Add accessor methods on the Producer/ConsumerRecord Headers headers()
  7. Add mutation methods for headers on the Producer/ConsumerRecord returning a new instance of Producer/ConsumerRecord with modified headers *** under discussion ***
    1. Code Block
      public class ProducerRecord<K, V> {
           
         ...
         
         ProducerRecord(K key, V value, Iterable<Header> headers, ...)
         
         ...
         
         public Headers headers();
       
         public ProducerRecord append(Iterable<Header> headers);
         
         public ProducerRecord append(Header header);
      
         public ProducerRecord remove(Iterable<Header> headers)
         
         public ProducerRecord remove(Header header)
      
         ...
         
      }
       
      Code Block
      public class ConsumerRecord<K, V> {
         
         ...
         ConsumerRecord(K key, V value, Iterable<Header> headers, ...)
      
         ...
         
         public Headers headers();
       
         public ConsumerRecord append(Iterable<Header> headers);
         
         public ConsumerRecord append(Header header);
      
         public ConsumerRecord remove(Iterable<Header> headers)
         
         public ConsumerRecord remove(Header header)
         
         ...
         
      }
       
  8. Changes needed, will piggyback onto V3 of ProduceRequest and V4 of FetchRequest which were introduced in KIP-98
  9. The serialisation of the [String, byte[]] header array will on the wire using a strict format
  10. Each headers value will be custom serialisable by the interceptors/plugins that use the header.

...