Table of Contents |
---|
Status
Current state: Under Discussion
...
Further details and a more detailed case for headers can be seen here : A Case for Kafka Headers
Public Interfaces
This KIP has the following public interface changes:
- Add a new headers length and value (byte[]) to the core message format.
Add Create a new Headers classclass
- It contains a map of key, value
Add a headers (
map(TypedMultiMap<String,
Typed)) - It exposes accessor methods, which convert into typed values (this is akin to JMS/AMQP headers/property accessors)
- void setBoolean(String, Boolean)
- boolean getBoolean(String)
- void setByte(String, Byte)
- Byte getByte(String)
- void setChar(String, Char)
- Char getChar(String)
- void setShort(String, Short)
- Short getShort(String)
- void setInteger(String, Integer)
- Integer getInteger(String)
- void setLong(String, Long)
- Long getLong(String)
- void setFloat(String, Float)
- Float getFloat(String)
- void setDouble(String, Double)
- Double getDouble(String)
- void setString(String, String)
- String getString(String)
- void setBytes(String, byte[])
- byte[] getBytes(String)
- As noted Key interface is of type string, if this key though can be converted to a more compact typed representational form, for compactness it will be converted e.g.
- "12" would be converted to Typed object of type=short and value=12 (int16)
- "true" would be converted to Typed object of type=boolean(true) and value=null
- It contains a map of key, value
- Add a new Typed class
- To hold the typed representations
Object>) field to Headers class
- Add accessor methods on the Headers class - void put<T>(String) and a Collection<Object> get(String)
Add a headers field to ProducerRecord and ConsumerRecord.
- Add Add accessor methods on the Producer/ConsumerRecord to get lazy initialised and lazy deserialized Headers Object.Headers getHeaders()
- ;
- Add ProduceRequest/ProduceResponse V4 which uses the new message format.
- Add FetchRequest/FetchResponse V4 which uses the new message format.
- The serialisation of the [typedString, typedbyte[]] header set array will on the wire using a strict format
- Each headers value will be typed, where primitive types and string will be natively supported and support for custom types with a byte[] typecustom serialisable by the interceptors/plugins that use the header.
For more detail information of the above changes, please refer to the Proposed Changes section.
...
- Adds the ability for producers to set standard header key=value value pairs
- No incompatible client api change (only new methods)
- Allows users to specify the serialisation of the header value per headerCompact key space
- Provides a standardised interface to eco systems of tools that then can grow around the feature
The disadvantage of this proposal is:
- Change to the message object
Create a
headers field Map<int, byte[]> to both ProducerRecord and ConsumerRecordHeaders Interface and Implementation to encapsulate headers protocol.
This lazy initialises/deserialises, on first method access.
- Fields:
- MultiMap<String, Object> headers
- Constructors
- ()
- (bytes[] headerBytes)
- Methods
- void putBoolean(String, boolean)
- void putByte(String, byte)
- void putChar(String, char)
- void putShort(String, short)
- void putInteger(String, integer)
- void putLong(String, long)
- void putFloat(String, float)
- void putDouble(String, double)
- void putString(String, String)
- void putBytes(String, byte[])
- Collection<Object> get(String) – object will be the primitive type wrapper
- Collection<String> keys()
- byte[] asBytes()
- Add a new Typed class
- To hold the typed representations
Add a headers field Headers to both ProducerRecord and ConsumerRecord
- Accessor methods of Headers getHeaders(
- Accessor methods of void setHeader(int, byte[]) and byte[] getHeader(int) added to interface of the ProducerRecord/ConsumerRecord.
...
Code Block | ||
---|---|---|
| ||
MessageAndOffset => Offset MessageSize Message Offset => int64 MessageSize => int32 Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value Crc => int32 MagicByte => int8 <---------------------- Bump up magic byte to 2 Attributes => int8 <---------------------- Use Bit 4 as boolean flag for if headers present Timestamp => int64 KeyLength => int32 Key => bytes (optional) HeadersLength => int32 <--------------- NEW [optional] size of the byte[] of the serialised headers if headers (optional) Headers => bytes <--------------------- NEW [optional] serialised form of the headers Map<int, byte[]> ValueLength => int32 Value => bytes |
Wire protocol of the headers bytes (if above mentioned attributes bit flag is true)
The below is for the headers wire protocol.
- A key can occur multiple times, clients should expect to handle this, this can be represented as a multi map, or an array of values per key in clients.
Code Block | ||
---|---|---|
| ||
Headers (bytes) => SetArray(KeyLength, Key, ValueLength, Value) Set => KeyKeyLength => int32 byte [] <-----------------NEW size of the byte[] of the serialised key value Key => Typedbytes <---------------------- NEW int key serialised string (UTF8) bytes of the header key ValueValueLength => int32 <-------------- NEW size of the byte[] of the serialised header value Value => Typedbytes <-------------------- NEW serialised form of the typed header value -> TypeValue interpretation of this below. TypedTypedValue (bytes) => (Type, Value) Type => byte <---------------------- 0x00 = null 0x01 = boolean=true 0x02 = boolean=false 0x03 0x01 = byte 0x040x02 = char 0x050x03 = short (int16) 0x060x04 = int (int32) 0x070x05 = long (int64) 0x080x06 = float 0x090x07 = double 0x0A0x08 = string 0x0B0x09 = byte[] Value => bytes <-------------------- NEW byte array holding the corresponding value in byte array form, 0x00-0x002 - null and boolean this will be zero length 0x03-0x09 - type length, will be the length of the byte array 0x0A-0x0B - string and byte this will contain the leading int32 length of the byte array. |
Compatibility, Deprecation, and Migration Plan
- Current client users should not be affected, this is new api methods being added to client apis
- Message version allows clients expecting older message version, would not be impacted
- older producers simply would produce a message without headers
- new consumer would simply get a message with empty headers
- older consumers simply would consume a message oblivious to there being any headers
- older producers simply would produce a message without headers
- Message version migration would be handled as like in KIP-32
...
- Core message size reduction
- remove overhead of 4 bytes for KeyLength when no headers using attributes bit
- reduce overhead of 4 bytes for KeyLength by using variable length encoded int
- reduce overhead of 4 bytes for ValueLength by using variable length encoded int
- Broker side interceptors
- with headers we could start introducing broker side message interceptors to append meta data or handle messages
- Single Record consumer API
- There is many uses cases where single record consumer/listener api is more user friendly - this is evident by the fact spring kafka have already created a wrapper, it would be good to support this natively.
Rejected Alternatives
Map<Int, byte[]> Headers added to the Producer/ConsumerRecord
The concept is similar to the above proposed but int keys
- Benefits
- more compact much reduced byte size overhead only 4 bytes.
- String keys can dwarf the value in byte size.
- Disadvantages
- String keys are more common in many systems
- Requires management of the int key space, where as string keys have natural key space management.
Map<String, String> Headers added to the
...
ConsumerRecord
The concept is similar to the above proposed but with a few more disadvantages.
- Benefits
- Adds the ability for producers to set standard header key=value string value pairs
- No incompatible client api change (only new methods)
- Allows users to specify the serialisation of the key=value map (String(&=), JSON, AVRO).
- Provides a standardised interface to eco systems of tools can grow around the feature
- Disadvantages
- Change to the message object
- String key cause a large key, this can cause a message payload thats of 60bytes to be dwarfed by its headers
- String value again doesn't allow for compact values, and restricts that a value must be a String
- Not able to use headers on the broker side with custom serialisation
...