Table of Contents |
---|
Status
Current state: Under Discussion
...
Further details and a more detailed case for headers can be seen here : A Case for Kafka Headers
Public Interfaces
This KIP has the following public interface changes:
- Add a new headers length and value (byte[]) to the core message format.
Add Create a new Headers classclass
- It contains a map of key, value
Add a headers (
map(TypedMultiMap<String,
Typed)) - It exposes accessor methods, which convert into typed values (this is akin to JMS/AMQP headers/property accessors)
- void setBoolean(String, Boolean)
- boolean getBoolean(String)
- void setByte(String, Byte)
- Byte getByte(String)
- void setChar(String, Char)
- Char getChar(String)
- void setShort(String, Short)
- Short getShort(String)
- void setInteger(String, Integer)
- Integer getInteger(String)
- void setLong(String, Long)
- Long getLong(String)
- void setFloat(String, Float)
- Float getFloat(String)
- void setDouble(String, Double)
- Double getDouble(String)
- void setString(String, String)
- String getString(String)
- void setBytes(String, byte[])
- byte[] getBytes(String)
- As noted Key interface is of type string, if this key though can be converted to a more compact typed representational form, for compactness it will be converted e.g.
- "12" would be converted to Typed object of type=short and value=12 (int16)
- "true" would be converted to Typed object of type=boolean(true) and value=null
- It contains a map of key, value
- Add a new Typed class
- To hold the typed representations
- Add accessor methods on the Producer/ConsumerRecord to get lazy initialised and lazy deserialized Headers Object.
- Headers getHeaders();
- Add ProduceRequest/ProduceResponse V4 which uses the new message format.
- Add FetchRequest/FetchResponse V4 which uses the new message format.
- The serialisation of the [typed, typed] header set will on the wire using a strict format
- Each headers value will be typed, where primitive types and string will be natively supported and support for custom types with a byte[] type.
For more detail information of the above changes, please refer to the Proposed Changes section.
Proposed Changes
There are four options proposed before this proposal. This details our proposed solution of Option 1 described here. The other options are in the Rejected Alternatives section.
- Adds the ability for producers to set standard header key=value value pairs
- No incompatible client api change (only new methods)
- Allows users to specify the serialisation of the header value per header
- Compact key space
- Provides a standardised interface to eco systems of tools that then can grow around the feature
The disadvantage of this proposal is:
- Change to the message object
Add a headers field Map<int, byte[]> to both ProducerRecord and ConsumerRecord
- Accessor methods of void setHeader(int, byte[]) and byte[] getHeader(int) added to interface of the ProducerRecord/ConsumerRecord.
Wire protocol change - use attribute bit4 as flag for if headers present. and add (optional) header size and field to the message format
The below is for the core Message wire protocol change needed to fit headers into the message.
A key to this is to ensure the headers cause no overhead if not present.
...
Object>) field to Headers class
- Add accessor methods on the Headers class - void put<T>(String) and a Collection<Object> get(String)
Add a
Code Block | ||
---|---|---|
| ||
MessageAndOffset => Offset MessageSize Message
Offset => int64
MessageSize => int32
Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value
Crc => int32
MagicByte => int8 <---------------------- Bump up magic byte to 2
Attributes => int8 <---------------------- Use Bit 4 as boolean flag for if headers present
Timestamp => int64
KeyLength => int32
Key => bytes
(optional) HeadersLength => int32 <--------------- NEW [optional] size of the byte[] of the serialised headers if headers
(optional) Headers => bytes <--------------------- NEW [optional] serialised form of the headers Map<int, byte[]>
ValueLength => int32
Value => bytes
|
Wire protocol of the headers bytes (if above mentioned attributes bit flag is true)
The below is for the headers wire protocol.
Code Block | ||
---|---|---|
| ||
Headers (bytes) => Set(Key, ValueLength, Value)
Set =>
Key => byte [] => Typed <--------------------- NEW int key of the header
Value => byte[] => Typed <-------------------- NEW serialised form of the header value
Typed (bytes) => (Type, Value)
Type => byte <---------------------- 0x00 = null
0x01 = boolean=true
0x02 = boolean=false
0x03 = byte
0x04 = char
0x05 = short (int16)
0x06 = int (int32)
0x07 = long (int64)
0x08 = float
0x09 = double
0x0A = string
0x0B = byte[]
Value => bytes <-------------------- NEW byte array holding the corresponding value in byte array form,
0x00-0x002 - null and boolean this will be zero length
0x03-0x09 - type length, will be the length of the byte array
0x0A-0x0B - string and byte this will contain the leading int32 length of the byte array.
|
Compatibility, Deprecation, and Migration Plan
- Current client users should not be affected, this is new api methods being added to client apis
- Message version allows clients expecting older message version, would not be impacted
- older producers simply would produce a message without headers
- new consumer would simply get a message with empty headers
- older consumers simply would consume a message oblivious to there being any headers
- older producers simply would produce a message without headers
- Message version migration would be handled as like in KIP-32
Table of Contents |
---|
Status
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-4208
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP tries to address the following issues in Kafka.
In most message systems (JMS, QPID etc), streaming systems and most transport systems(HTTP, TCP), it is typical to have a concept of headers and payload.
The payload is traditionally for the business object, and headers are traditionally used for transport routing, filtering etc. Headers are most typically key=value pairs.
In its current state Kafka does not support the ability to have headers natively in its message/record format.
Examples where having separate supported custom headers becomes useful (this is not an exhaustive list).
- Automated routing of messages based on header information between clusters
- Enterprise APM tools (e.g. Appdynamics, Dynatrace) need to stitch in 'magic' transaction ids for them to provide end to end transaction flow monitoring.
- Audit metadata to be recorded with the message, e.g. clientId that produced the record, unique message id, originating clusterId the message was first produced into for multi cluster routing.
- Business payload needs to be end to end encrypted and signed without tamper, but eco-system components need access to metadata to achieve tasks.
Kafka currently has Record<K, V> structure which originally could be used to follow this semantic where by K could contain the headers information, and the V could be the payload.
- Since message compaction feature it is no longer possible to add metadata to K, else compaction would treat each message as a different keyed message .
- It is not currently possible to use value part and use some form of a wrapper e.g. Message<H, V>, as for compaction to perform a delete a record is sent with a NULL value, as such for where a delete record is sent using a message wrapper to carry the metadata would not work, as the value technically would no longer be null.
This issue has been flagged by many people over the past period in forums.
Further details and a more detailed case for headers can be seen here : A Case for Kafka Headers
Public Interfaces
This KIP has the following public interface changes:
- Add a new headers length and value (byte[]) to the core message format.
Create a Headers class
Add a headers (MultiMap<String, byte[]>) field to Headers class
- Add accessor methods on the Headers class - void add(String, byte[]) and a Collection<byte[]> get(String)
Add a headers field to ProducerRecord and ConsumerRecord.
- Add accessor methods on the Producer/ConsumerRecord Headers getHeaders()
- Add ProduceRequest/ProduceResponse V4 which uses the new message format.
- Add FetchRequest/FetchResponse V4 which uses the new message format.
- The serialisation of the [String, byte[]] header array will on the wire using a strict format
- Each headers value will be custom serialisable by the interceptors/plugins that use the header.
For more detail information of the above changes, please refer to the Proposed Changes section.
Proposed Changes
There are four options proposed before this proposal. This details our proposed solution of Option 1 described here. The other options are in the Rejected Alternatives section.
...
- Adds the ability for producers to set standard header key=value value pairs
- No incompatible client api change (only new methods)
- Allows users to specify the serialisation of the header value per header
- Provides a standardised interface to eco systems of tools that then can grow around the feature
The disadvantage of this proposal is:
- Change to the message object
Create a Headers Interface and Implementation to encapsulate headers protocol.
This lazy initialises/deserialises, on first method access./deserialises, on first method access.
- Fields:
- MultiMap<String, Object> headers
- Constructors
- ()
- (bytes[] headerBytes)
- Methods
- void putBoolean(String, boolean)
- void putByte(String, byte)
- void putChar(String, char)
- void putShort(String, short)
- void putInteger(String, integer)
- void putLong(String, long)
- void putFloat(String, float)
- void putDouble(String, double)
- void putString(String, String)
- void putBytes
- Fields:
- MultiMap<String, byte[]> headers
- Constructors
- ()
- (bytes[] headerBytes)
- Methods
- void setBoolean(String, Boolean)
- Boolean getBoolean(String)
- void setByte(String, Byte)
- Byte getByte(String)
- void setChar(String, Char)
- Char getChar(String)
- void setShort(String, Short)
- Short getShort(String)
- void setInteger(String, Integer)
- Integer getInteger(String)
- void setLong(String, Long)
- Long getLong(String)
- void setFloat(String, Float)
- Float getFloat(String)
- void setDouble(String, Double)
- Double getDouble(String)
- void setString(String, String)
- String getString(String)
- void setBytes(String, byte[])
byte[] getBytes(String) - Object Collection<Object> get(String) – object will be the primitive type wrapper
- Collection<String> keys()
- byte[] asBytes()
- Add a new Typed class
- To hold the typed representations
Add a headers field Headers to both ProducerRecord and ConsumerRecord
- Accessor methods of Headers getHeaders() added to interface of the ProducerRecord/ConsumerRecord.
Wire protocol change - use attribute bit4 as flag for if headers present. and add (optional) header size and field to the message format
The below is for the core Message wire protocol change needed to fit headers into the message.
...
Code Block | ||
---|---|---|
| ||
MessageAndOffset => Offset MessageSize Message Offset => int64 MessageSize => int32 Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value Crc => int32 MagicByte => int8 <---------------------- Bump up magic byte to 2 Attributes => int8 <---------------------- Use Bit 4 as boolean flag for if headers present Timestamp => int64 KeyLength => int32 Key => bytes (optional) HeadersLength => int32 <--------------- NEW [optional] size of the byte[] of the serialised headers if headers (optional) Headers => bytes <--------------------- NEW [optional] serialised form of the headers Map<int, byte[]> ValueLength => int32 Value => bytes |
Wire protocol of the headers bytes (if above mentioned attributes bit flag is true)
The below is for the headers wire protocol.
...
Code Block | ||
---|---|---|
| ||
Headers (bytes) => Array(KeyLength, Key, ValueLength, Value) Set => KeyLength => int32 <-----------------NEW size of the byte[] of the serialised key value Key => bytes <---------------------- NEW serialised string (UTF8) bytes of the header key ValueLength => int32 <-------------- NEW size of the byte[] of the serialised header value Value => bytes <-------------------- NEW serialised form of the typed header value -> TypeValue interpretation of this below. TypedValue (bytes) => (Type, Value) Type => byte <---------------------- 0x00 = boolean 0x01 = byte 0x02 = char 0x03 = short (int16) 0x04 = int (int32) 0x05 = long (int64) 0x06 = float 0x07 = double 0x08 = string 0x09 = byte[] Value => bytes <-------------------- NEW byte array holding the corresponding value in byte array form, 0x00-0x002 - null and boolean this will be zero length |
Compatibility, Deprecation, and Migration Plan
- Current client users should not be affected, this is new api methods being added to client apis
- Message version allows clients expecting older message version, would not be impacted
- older producers simply would produce a message without headers
- new consumer would simply get a message with empty headers
- older consumers simply would consume a message oblivious to there being any headers
- older producers simply would produce a message without headers
- Message version migration would be handled as like in KIP-32
...