THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- Add a new headers length and value (byte[]) to the core message format.
- Create a Header Interface and implementing class
Interface
Code Block public interface Header { String key(); byte[] value(); }
Implementation Detail
- Add a String key field to Header implementing class
Add a byte[] value field to Header implementing class
- Create a Headers Interface and implementing class
Interface
Code Block public interface Headers extends Iterable<Header> { Headers append(Iterable<Header> headers); Headers append(Header header); Headers remove(Iterable<Header> headers); Headers remove(Header header); /** * Get just the headers for the given key */ Iterable<Header> headers(String key); /** * Turns the mutable instance into read only immutable instance. * this is invoked on send after the interceptors. */ Headers close(); }
Implementation Detail
Add accessor methods on the Headers class
- Headers append(Iterable<Header> headers)
- Headers append(Header header)
- Headers remove(Iterable<Header> headers)
- Headers remove(Header header)
- Iterable<Header> headers(String key)
- Inline with ConsumerRecords interface returning the subset of ConsumerRecord for a given topic.
- public Iterable<ConsumerRecord<K, V>> records(String topic)
- Inline with ConsumerRecords interface returning the subset of ConsumerRecord for a given topic.
- Mutation methods will return a new instance. *** under discussion ***
- close method will turn headers object from mutable into immutable *** under discussion ***
- implement Iterable<Header>
- interceptors are expected to add headers during the produce intercept stage.
- Add a headers field to ProducerRecord and ConsumerRecord.
- Add constructor(s) of Producer/ConsumerRecord to allow passing in of Iterable<Header>
- use case is MirrorMakers able to copy headers.
- Add accessor methods on the Producer/ConsumerRecord Headers headers()
- Add mutation methods for headers on the Producer/ConsumerRecord returning a new instance of Producer/ConsumerRecord with modified headers *** under discussion ***
Code Block public class ProducerRecord<K, V> { ... ProducerRecord(K key, V value, Iterable<Header> headers, ...) ... public Headers headers(); public ProducerRecord append(Iterable<Header> headers); public ProducerRecord append(Header header); public ProducerRecord remove(Iterable<Header> headers) public ProducerRecord remove(Header header) ... }
Code Block public class ConsumerRecord<K, V> { ... ConsumerRecord(K key, V value, Iterable<Header> headers, ...) ... public Headers headers(); public ConsumerRecord append(Iterable<Header> headers); public ConsumerRecord append(Header header); public ConsumerRecord remove(Iterable<Header> headers) public ConsumerRecord remove(Header header) ... }
- Changes needed, will piggyback onto V3 of ProduceRequest and V4 of FetchRequest which were introduced in KIP-98
- The serialisation of the [String, byte[]] header array will on the wire using a strict format
- Each headers value will be custom serialisable by the interceptors/plugins that use the header.
...