...
This KIP has the following public interface changes:
- Add a new headers length and value (byte[]) to the core message format.
Add a headers (Map<int, byte[]>) field to ProducerRecord and ConsumerRecord. A producer/interceptors will be able to set headers on a ProducerRecord. A consumer/interceptors will see the message headers when it sees the messages.
- Add accessor methods on the Producer/ConsumerRecord void setHeader(int, byte[]) and a byte[] getHeader(int)
- Add ProduceRequest/ProduceResponse V3 which uses the new message format.
- Add FetchRequest/FetchResponse V3 which uses the new message format.
- The serialization of the [int, bye[]] header set will on the wire using a strict format
- Each headers value will be custom serializable by the interceptors/plugins that use the header.
- As int key based headers for compactness ranges will be reserved for different usages:
- client headers adopted by the kafka open source (not necessary the kafka open source owns it)
- broker headers adopted by the kafka open source (not necessary the kafka open source owns it)
- commercial vendor space
- custom inhouse
...
- Adds the ability for producers to set standard header key=value value pairs
- No incompatible client api change (only new methods)
- Allows users to specify the serialization of the header value per header
- Compact key space
- Provides a standardized interface to eco systems of tools can grow around the feature
The disadvantage of this proposal is:
- Change to the message object
Add a headers field Map<int, byte[]> to both ProducerRecord and ConsumerRecord
- Accessor methods of void setHeader(int, byte[]) and byte[] getHeader(int) added to interface of both.
Wire protocol change - use attribute bit4 as flag for if headers present. and add (optional) header size (variable length int) and field to the message format
The below is for the core Message wire protocol change needed to fit headers into the message.
...
- The attributes flag bit is used to keep the message size the same as before if no headers are used
- HeadersLength is a variable length encoded int saving bytes where headers are small in size/number
Code Block | ||
---|---|---|
| ||
MessageAndOffset => Offset MessageSize Message Offset => int64 MessageSize => int32 Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value Crc => int32 MagicByte => int8 <---------------------- Bump up magic byte to 2 Attributes => int8 <---------------------- Use Bit 4 as boolean flag for if headers present Timestamp => int64 KeyLength => int32 Key => bytes (optional) HeadersLength => variable int32 <------------------ NEW [optional] length of the byte[] of the serialized headers if headers (optional) Headers => bytes <--------------------------------- NEW [optional] serialized form of the headers Map<String, String> ValueLength => int32 Value => bytes |
...
With this where ints are in the below ranges we get the benefits of less bytes for an int than standard 4 byte allocation and best benefit.
...
As such we propose that:
+ve ints 0->255 are reserved for the apache kafka open registered headers as these are more likely to be more commonly used as such saves space
...