...
- Add a new headers length and value (byte[]) to the core message format.
Add a new Headers class
- It contains a map of key, value headers (
- Add accessor methods on the Producer/ConsumerRecord void setHeader(int, byte[]) and a byte[] getHeader(int)
- Add ProduceRequest/ProduceResponse V4 which uses the new message format.
- Add FetchRequest/FetchResponse V4 which uses the new message format.
- The serialisation of the [int, bye[]] header set will on the wire using a strict format
- Each headers value will be custom serialisable by the interceptors/plugins that use the header
For more detail information of the above changes, please refer to the Proposed Changes section.
Proposed Changes
There are four options proposed before this proposal. This details our proposed solution of Option 1 described here. The other options are in the Rejected Alternatives section.
- Adds the ability for producers to set standard header key=value value pairs
- No incompatible client api change (only new methods)
- Allows users to specify the serialisation of the header value per header
- Compact key space
- Provides a standardised interface to eco systems of tools that then can grow around the feature
The disadvantage of this proposal is:
- Change to the message object
Add a headers field Map<int, byte[]> to both ProducerRecord and ConsumerRecord
- Accessor methods of void setHeader(int, byte[]) and byte[] getHeader(int) added to interface of the ProducerRecord/ConsumerRecord.
Wire protocol change - use attribute bit4 as flag for if headers present. and add (optional) header size and field to the message format
The below is for the core Message wire protocol change needed to fit headers into the message.
A key to this is to ensure the headers cause no overhead if not present.
...
- map(Typed,Typed))
- It exposes accessor methods, which convert into typed values (this is akin to JMS/AMQP headers/property accessors)
- void setBoolean(String, Boolean)
- boolean getBoolean(String)
- void setByte(String, Byte)
- Byte getByte(String)
- void setChar(String, Char)
- Char getChar(String)
- void setShort(String, Short)
- Short getShort(String)
- void setInteger(String, Integer)
- Integer getInteger(String)
- void setLong(String, Long)
- Long getLong(String)
- void setFloat(String, Float)
- Float getFloat(String)
- void setDouble(String, Double)
- Double getDouble(String)
- void setString(String, String)
- String getString(String)
- void setBytes(String, byte[])
- byte[] getBytes(String)
- As noted Key interface is of type string, if this key though can be converted to a more compact typed representational form, for compactness it will be converted e.g.
- "12" would be converted to Typed object of type=short and value=12 (int16)
- "true" would be converted to Typed object of type=boolean(true) and value=null
- Add a new Typed class
- To hold the typed representations
- Add accessor methods on the Producer/ConsumerRecord to get lazy initialised and lazy deserialized Headers Object.
- Headers getHeaders();
- Add ProduceRequest/ProduceResponse V4 which uses the new message format.
- Add FetchRequest/FetchResponse V4 which uses the new message format.
- The serialisation of the [typed, typed] header set will on the wire using a strict format
- Each headers value will be typed, where primitive types and string will be natively supported and support for custom types with a byte[] type.
For more detail information of the above changes, please refer to the Proposed Changes section.
Proposed Changes
There are four options proposed before this proposal. This details our proposed solution of Option 1 described here. The other options are in the Rejected Alternatives section.
- Adds the ability for producers to set standard header key=value value pairs
- No incompatible client api change (only new methods)
- Allows users to specify the serialisation of the header value per header
- Compact key space
- Provides a standardised interface to eco systems of tools that then can grow around the feature
The disadvantage of this proposal is:
- Change to the message object
Add a headers field Map<int, byte[]> to both ProducerRecord and ConsumerRecord
- Accessor methods of void setHeader(int, byte[]) and byte[] getHeader(int) added to interface of the ProducerRecord/ConsumerRecord.
Wire protocol change - use attribute bit4 as flag for if headers present. and add (optional) header size and field to the message format
The below is for the core Message wire protocol change needed to fit headers into the message.
A key to this is to ensure the headers cause no overhead if not present.
- The attributes flag bit is used to keep the message size the same as before if no headers are used
Code Block | ||
---|---|---|
| ||
MessageAndOffset => Offset MessageSize Message
Offset => int64
MessageSize | ||
Code Block | ||
| ||
MessageAndOffset => Offset MessageSize Message Offset => int64 MessageSize => int32 Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value Crc => int32 MagicByte => int8 <---------------------- Bump up magic byte to 2 Attributes => int8 <---------------------- Use Bit 4 as boolean flag for if headers present Timestamp => int64 KeyLength => int32 KeyMessage => bytes Crc MagicByte Attributes (optional)Timestamp KeyLength Key HeadersLength Headers ValueLength Value Crc => int32 MagicByte => int8 <---------------- NEW [optional] size of the byte[] of the serialised headers if headers------ Bump up magic byte to 2 (optional) HeadersAttributes => bytesint8 <---------------------- NEW [optional] serialised form of the headers Map<int, byte[]>Use Bit 4 as boolean flag for if headers present Timestamp => int64 KeyLength ValueLength => int32 ValueKey => bytes |
Wire protocol of the headers bytes (if above mentioned attributes bit flag is true)
The below is for the headers wire protocol.
Code Block | ||
---|---|---|
| ||
Headers (bytes) => Set(Key, ValueLength, Value) Set => Key => byte [] => Typed <--- (optional) HeadersLength => int32 <------------------ NEW int[optional] keysize of the header byte[] of the serialised Valueheaders => byte[]if headers (optional) Headers => Typedbytes <--------------------- NEW [optional] serialised form of the headers header value Typed (bytes)Map<int, byte[]> ValueLength => (Type,int32 Value) Type => bytes |
Wire protocol of the headers bytes (if above mentioned attributes bit flag is true)
The below is for the headers wire protocol.
Code Block | ||
---|---|---|
| ||
Headers (bytes) => Set(Key, ValueLength, Value) Set => Key => byte [] => Typed <--------------------- NEW int key of the header Value => byte[] => Typed <-------------------- NEW serialised form of the header value Typed (bytes) => (Type, Value) Type => byte <---------------------- 0x00 = null 0x01 = boolean=true byte <---------------------- 0x00 = null 0x01 = boolean=true 0x02 = boolean=false 0x03 0x02 = byteboolean=false 0x040x03 = charbyte 0x050x04 = short (int16)char 0x060x05 = intshort (int32int16) 0x070x06 = longint (int64int32) 0x080x07 = floatlong (int64) 0x090x08 = doublefloat 0x0A0x09 = stringdouble 0x0B0x0A = string 0x0B = byte[] Value => bytes <-------------------- NEW byte array holding the corresponding value in byte array form, 0x00-0x002 - null and boolean this will be zero length 0x03-0x09 - type length, will be the length of the byte array 0x0A-0x0B - string and byte this will contain the leading int32 length of the byte array. |
Example Key Allocation
As mentioned above ranges of keys would will be reserved for different usages, below is an example way this could be managed.
Kafka open source (0x000)
- 0x00000000-0x0000FFFF Kafka open source
- 0x00010000-0x000FFFFF RESERVED
Local (0x001)
- 0x00100000-0x0010FFFF Local use - Infrastructure
- 0x00110000-0x0011FFFF Local use - Client
- 0x00120000-0x0012FFFF Local use - Stack/Middleware
- 0x00130000-0x0013FFFF Local use - Application
- 0x00140000-0x0014FFFF Local use - User
- 0x00150000-0x001FFFFF RESERVED
Open (0x002)
- 0x00200000-0x0020FFFF Open use - Infrastructure
- 0x00210000-0x0021FFFF Open use - Client
- 0x00220000-0x0022FFFF Open use - Stack/Middleware
- 0x00230000-0x0023FFFF Open use - Application
- 0x00240000-0x0024FFFF Open use - User
- 0x00250000-0x002FFFFF RESERVED
Testing (0x003)
- 0x00300000-0x0030FFFF Testing - Infrastructure
- 0x00310000-0x0031FFFF Testing - Client
- 0x00320000-0x0032FFFF Testing - Stack/Middleware
- 0x00330000-0x0033FFFF Testing - Application
- 0x00340000-0x0034FFFF Testing - User
- 0x00350000-0x003FFFFF RESERVED
Reserved
- 0x00400000-0xFFFFFFFF RESERVED
- A kafka header used in the kafka open source distribution could have the value of 0x00000010 (10).
- A header used internally at organisation X would fall under Local. If it was something used by infrastructure, like tracing, that could use 0x00100010 (1048592)
- A header used by an open source project somebody is working on, that they just want to put out there (without coordination) could start using a value of 0x00220010 (2228240) if it was a plugin for annotating the location on a message.
- An application that was testing whether it can use headers for a new feature it's developing could pick a header with key 0x0034010 (3407888).
- The Kafka open source number space is coordinated via the Apache Kafka opensource project. A class would list all possible headers, their numbers and their string equivalents (for output/logging/etc).
- A Local organisation is in charge of coordinating it's local number space. It would be in charge of writing a text file, a class or a service to coordinate who gets what numbers when.
- In the open internet you can grab any number in the Open space but should expect no guarantees that other people may or may not be using that number.
- When you're doing testing you can safely grab a number in the testing space and be assured that you won't collide with an official system header. It's still possible to collide with other testing headers, but no production system should depend on these headers.
Sample register that would end up having in the open source space.
key | name | description | by | url |
---|---|---|---|---|
1 | client.id | producers client id | Apache Kafka | some url to a document about it |
2 | cluster.id | cluster id of where the message first originated | Apache Kafka | some url to a document about it |
3 | correlation.id | correlation id for where a message is for mutex response from a request | Apache Kafka | some url to a document about it |
2100001 | new.relic | stores the transaction linking guid for transaction sticking by new relic | New Relic | some url to a document about it |
2100002 | appdynamics | stores the transaction linking guid for transaction stiching by app dynamics | AppDynamics | some url to a document about it |
To assit and help ensure ease of use and uniformity a constants class should be kept and updated with the above (similar to how java sql codes work) e.g.
Code Block |
---|
package org.apache.kafka.common.config;
public class KafkaHeaderKeys
{
public static final int CLIENT_ID_KEY = 1;
public static final int CLUSTER_ID_KEY = 2;
public static final int CORRELATION_ID_KEY = 3;
} |
Sample register that would end up having local (In-house) custom per organisation
...
type length, will be the length of the byte array
0x0A-0x0B - string and byte this will contain the leading int32 length of the byte array.
|
...
Compatibility, Deprecation, and Migration Plan
- Current client users should not be affected, this is new api methods being added to client apis
- Message version allows clients expecting older message version, would not be impacted
- older producers simply would produce a message without headers
- new consumer would simply get a message with empty headers
- older consumers simply would consume a message oblivious to there being any headers
- older producers simply would produce a message without headers
- Message version migration would be handled as like in KIP-32
...