Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Add a new headers length and value (byte[]) to the core message format.
  2. Create a Header Interface and implementing class
    1. Interface

      Code Block
      public interface Header
      {
         String key();
      
         byte[] value();
      }
      
      
      
    2. Implementation Detail

      1. Add a String key field to Header implementing class
      2. Add a byte[] value field to Header implementing class

  3. Create a Headers Interface and implementing class 
    1. Interface

      Code Block
      public interface Headers extends Iterable<Header>
      {
         Headers append(Iterable<Header> headers);
      
         Headers remove(Iterable<Header> headers);
        
         Iterable<Header> filter(String key);   
      }
      
      
    2. Implementation Detail

      1. Add a headers Header[] field to Headers implementation class

        Add

        accessor methods on the Headers class 

        1. Headers append(Iterable<Header> headers)
        2. Headers remove(Iterable<Header> headers)
        3. Iterable<Header> filter(String)
      2. implement Iterable<Header>
      3. interceptors are expected to add headers during the produce intercept stage.
  4. Add a headers field to ProducerRecord and ConsumerRecord. 
  5. Add constructor(s) of Producer/ConsumerRecord to allow passing in of Iterable<Header> 
    1. use case is MirrorMakers able to copy headers.
  6. Add accessor methods on the Producer/ConsumerRecord Headers headers()
    1. Code Block
      public class ProducerRecord<K, V> {
           
         ...
         
         ProducerRecord(K key, V value, Iterable<Header> headers, ...)
         
         ...
         
         public Headers headers();
         
         ...
         
      }
       
      Code Block
      public class ConsumerRecord<K, V> {
         
         ...
         ConsumerRecord(K key, V value, Iterable<Header> headers, ...)
      
         ....
         
         public Headers headers();
         
         ...
         
      }
       
  7. Add ProduceRequest/ProduceResponse V4 which uses the new message format.
  8. Add FetchRequest/FetchResponse V4 which uses the new message format.
  9. The serialisation of the [String, byte[]] header array will on the wire using a strict format
  10. Each headers value will be custom serialisable by the interceptors/plugins that use the header.

...

  • Adds the ability for producers to set standard header key=value value pairs
  • No incompatible client api change (only new methods)
  • Allows users to specify the serialisation of the header value per header
  • Provides a standardised interface to eco systems of tools that then can grow around the feature

The disadvantage of this proposal is:

  • Change to the message object

Create a Headers Interface and Implementation to encapsulate headers protocol.

  • Fields:
    • Headers[] headersArray
  • Constructors
    • ()
    • (bytes[] headerBytes)
  • Methods
    • Iterable<Header> filter(String key)
    • Headers append(Iterable<Header> headers)
    • byte[] asBytes()
  • See above public interfaces section for sample interfaces.

Add a headers field Headers to both ProducerRecord and ConsumerRecord

  • Accessor methods of Headers getHeadersheaders() added to interface of the ProducerRecord/ConsumerRecord.
  • Add constructor(s) of Producer/ConsumerRecord to allow passing in of existing he an existing/pre-constructed headers via Iterable<Header> 
    1. use case is MirrorMakers able to copy headers.

...