...
Further details and a more detailed case for headers can be seen here : A Case for Kafka Headers
Amendments made during implementation, and on KIP-118 being pulled are highlighted orange, changes reviewed during PR and notification sent to dev mailing lists.
Public Interfaces
This KIP has the following public interface changes:
- Add a new headers length and value (byte[]) to the core message format.
- Create a Header Interface and implementing class
Interface
Code Block public interface Header { static Header create(String key, byte[] value); String key(); byte[] value(); }
Implementation Detail
- Add a String key field to Header implementing class
Add a byte[] value field to Header implementing class
- Create a Headers Interface and implementing class
- Headers will be mutable
- For the Producer, after send and post interceptors it will be turned into a read only immutable instance.
- This will be done by the invoking "close()" method, this method is not exposed in the api, but an implementation detail.
Interface
Code Block public interface Headers extends Iterable<Header> { /** * Adds a header (key inside), returning if the operation succeeded. * If headers is in read-only, will always fail the operation with throwing IllegalStateException. */ Headers add(Header header) throws IllegalStateException; /** * Adds a header by key and value, returning if the operation succeeded. * If headers is in read-only, will always fail the operation with throwing IllegalStateException. */ Headers add(String key, byte[] value) throws IllegalStateException; /** * Removes ALL HEADERS for the given key returning if the operation succeeded. * If headers is in read-only, will always fail the operation with throwing IllegalStateException. */ Headers remove(String key) throws IllegalStateException; /** * Returns JUST ONE (the very last) header for the given key, if present. */ Header lastHeader(String key) /** * Returns ALL headers for the given key, if present. */ Iterable<Header> headers(String key); }
Implementation Detail
Add accessor methods on the Headers class
- Headers add(Header header)
- Headers add(String key, byte[] value)
- Headers remove(Header header)
- Header lastHeader(String key)
- Iterable<Header> headers(String key)
- Inline with ConsumerRecords interface returning the subset of ConsumerRecord for a given topic.
- public Iterable<ConsumerRecord<K, V>> records(String topic)
- Inline with ConsumerRecords interface returning the subset of ConsumerRecord for a given topic.
- implement Iterable<Header>
- interceptors and k,v serialisers are expected to add headers during the produce intercept stage.
- Headers will be mutable
- Add a headers field to ProducerRecord and ConsumerRecord.
- Add constructor(s) of Producer/ConsumerRecord to allow passing in of Iterable<Header>
- use case is MirrorMakers able to copy headers.
- Add accessor methods on the Producer/ConsumerRecord Headers headers()
Code Block public class ProducerRecord<K, V> { ... ProducerRecord(K key, V value, Iterable<Header> headers, ...) ... public Headers headers(); ... }
Code Block public class ConsumerRecord<K, V> { ... ConsumerRecord(K key, V value, Iterable<Header> headers, ...) ... public Headers headers(); ... }
- Changes needed, will piggyback onto V3 of ProduceRequest and V4 of V5 of FetchRequest which were introduced in KIP-98
- The serialisation of the [String, byte[]] header array will on the wire using a strict format
- Each headers value will be custom serialisable by the interceptors/plugins/serdes that use the header.
Expose headers to De/Serializers - extended interface added, for lack of default methods available in java 8
Code Block public interface ExtendedDeserializer<T> extends Deserializer<T> { T deserialize(String topic, Headers headers, byte[] data); }
Code Block public interface ExtendedSerializer<T> extends Serializer<T> { byte[] serialize(String topic, Headers headers, T data); }
For more detail information of the above changes, please refer to the Proposed Changes section.
...
- Adds the ability for producers to set standard header key=value value pairs
- No incompatible client api change (only new methods)
- Allows users to specify the serialisation of the header value per header
- Provides a standardised interface to eco systems of tools that then can grow around the feature
The disadvantage of this proposal is:
- Change to the message object
Key duplication and ordering
- Duplicate headers with the same key must be supported.
- The order of headers must be retained throughout a record's end-to-end lifetime: from producer to consumer.
Create a Headers Interface and Implementation to encapsulate headers protocol.
- See above public interfaces section for sample interfaces.
Add a headers field Headers to both ProducerRecord and ConsumerRecord
- Accessor methods of Headers headers() added to interface of the ProducerRecord/ConsumerRecord.
- Add constructor(s) of Producer/ConsumerRecord to allow passing in of an existing/pre-constructed headers via Iterable<Header>
- use case is MirrorMakers able to copy headers.
Add new method to make headers accessible during de/serialization
- Add new default method to Serialization/Deserialzation class, with Header parameter
- Due to KIP-118 not being implemented, a new extended interface ExtendedSerializer ExtendedDeserializer with new extra methods is introduced instead
- Existing De/Serialization will be wrapped and work as before.
- Due to KIP-118 not being implemented, a new extended interface ExtendedSerializer ExtendedDeserializer with new extra methods is introduced instead
Wire protocol change - add array of headers to end of the message format
...