Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

 

Table of Contents

Status

Current state:  Under Discussion 

...

Further details and a more detailed case for headers can be seen here : A Case for Kafka Headers

Public Interfaces

This KIP has the following public interface changes:

  1. Add a new headers length and value (byte[]) to the core message format.
  2. Create a Headers class 

    1. Add a headers (MultiMap<String, Object>) field to Headers class

    2. Add accessor methods on the Headers class - void put<T>(String) and a Collection<Object> get(String)
  3. Add a headers Map<int, byte[]>) field to ProducerRecord and ConsumerRecord. 

  4. Add accessor methods on the Producer/ConsumerRecord void setHeader(int, byte[]) and a byte[] getHeader(int)Headers getHeaders()
    1. Add ProduceRequest
    Add ProduceRequest
    1. /ProduceResponse V4 which uses the new message format.
  5. Add FetchRequest/FetchResponse V4 which uses the new message format.
  6. The serialisation of the [intString, byebyte[]] header set array will on the wire using a strict format
  7. Each headers value will be custom serialisable by the interceptors/plugins that use the header.

For more detail information of the above changes, please refer to the Proposed Changes section.

...

  • Adds the ability for producers to set standard header key=value value pairs
  • No incompatible client api change (only new methods)
  • Allows users to specify the serialisation of the header value per header
  • Compact key space
  • Provides a standardised interface to eco systems of tools that then can grow around the feature

The disadvantage of this proposal is:

  • Change to the message object
Add

Create a

headers field Map<int, byte[]> to both ProducerRecord and ConsumerRecord
  • Accessor methods of void setHeader(int, byte[]) and byte[] getHeader(int) added to interface of the ProducerRecord/ConsumerRecord.

Wire protocol change - use attribute bit4 as flag for if headers present. and add (optional) header size and field to the message format

The below is for the core Message wire protocol change needed to fit headers into the message.

A key to this is to ensure the headers cause no overhead if not present.

...

Headers Interface and Implementation to encapsulate headers protocol.

This lazy initialises/deserialises, on first method access.

  • Fields:
    • MultiMap<String, Object> headers
  • Constructors
    • ()
    • (bytes[] headerBytes)
  • Methods
    • void putBoolean(String, boolean)
    • void putByte(String, byte)
    • void putChar(String, char)
    • void putShort(String, short)
    • void putInteger(String, integer)
    • void putLong(String, long)
    • void putFloat(String, float)
    • void putDouble(String, double)
    • void putString(String, String)
    • void putBytes(String, byte[])

    • Collection<Object> get(String) – object will be the primitive type wrapper
    • Collection<String> keys()
    • byte[] asBytes()
  1. Add a new Typed class
    1. To hold the typed representations

Add a headers field Headers to both ProducerRecord and ConsumerRecord

  • Accessor methods of Headers getHeaders() added to interface of the ProducerRecord/ConsumerRecord.

Wire protocol change - use attribute bit4 as flag for if headers present. and add (optional) header size and field to the message format

The below is for the core Message wire protocol change needed to fit headers into the message.

A key to this is to ensure the headers cause no overhead if not present.

  • The attributes flag bit is used to keep the message size the same as before if no headers are used

Code Block
languagejava
MessageAndOffset => Offset MessageSize Message
  Offset => int64  
  MessageSize => int32
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value
    Crc => int32
    MagicByte => int8  <-----
Code Block
languagejava
MessageAndOffset => Offset MessageSize Message
  Offset => int64  
  MessageSize => int32
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value
    Crc => int32
    MagicByte => int8  <---------------------- Bump up magic byte to 2
    Attributes => int8 <---------------------- Use Bit 4 as boolean flag for if headers present
    Timestamp => int64
    KeyLength => int32
    Key => bytes
    (optional) HeadersLength => int32 <----------------- NEWBump [optional] size of the byte[] of the serialised headers if headersup magic byte to 2
    (optional) HeadersAttributes => bytesint8 <---------------------- Use Bit NEW4 [optional]as serialisedboolean formflag offor theif headers present
 Map<int, byte[]>   Timestamp => int64
    KeyLength ValueLength => int32
    ValueKey => bytes

Wire protocol of the headers bytes (if above mentioned attributes bit flag is true)

The below is for the headers wire protocol.

 

Code Block
languagejava
Headers (bytes) => Set(Key, ValueLength, Value)
  Set =>
	Key => byte [] => Typed <----    (optional) HeadersLength => int32 <----------------- NEW int[optional] keysize of the header
byte[] of the serialised Valueheaders => byte[]if headers
    (optional) Headers => Typedbytes <--------------------- NEW [optional] serialised form of the headers header value

Typed (bytes)Map<int, byte[]>
    ValueLength => (Type,int32
    Value => bytes


Wire protocol of the headers bytes (if above mentioned attributes bit flag is true)

The below is for the headers wire protocol.

  • A key can occur multiple times, clients should expect to handle this, this can be represented as a multi map, or an array of values per key in clients.

Code Block
languagejava
Headers (bytes) => Array(KeyLength, Key, ValueLength, Value)
  Set =>
	KeyLength => int32 <-----------------NEW size of the byte[] of the serialised key value
	Key => bytes <---------------------- NEW serialised string (UTF8) bytes of the header key
    ValueLength => int32 <-------------- NEW size of the byte[] of the serialised header value
    Value => bytes <-------------------- NEW serialised form of the typed header value -> TypeValue interpretation of this below.
 
TypedValue (bytes) => (Type, Value)
	Type => byte <---------------------- 0x00 = boolean)
	Type => byte <---------------------- 0x00 = null
										 0x01 = boolean=true
                                         0x02 = boolean=false
                                         0x03 = byte
                                         0x040x01 = charbyte
                                         0x050x02 = short (int16)char
                                         0x060x03 = intshort (int32int16)
                                         0x070x04 = longint (int64int32)
                                         0x080x05 = floatlong (int64)
                                         0x090x06 = doublefloat
                                         0x0A0x07 = stringdouble
                                         0x0B0x08 = byte[]string
      Value => bytes <-------------------- NEW byte array holding the corresponding value in byte array form, 
										 0x00-0x002 - null and boolean this will be zero length
                                         0x03-0x09 - type length, will be the length of the byte array
                                         0x0A-0x0B - string and byte this will contain the leading int32 length of the byte array.
 
 

 

Example Key Allocation

As mentioned above ranges of keys would will be reserved for different usages, below is an example way this could be managed.

Kafka open source  (0x000)
  • 0x00000000-0x0000FFFF Kafka open source
  • 0x00010000-0x000FFFFF RESERVED
Local (0x001)
  • 0x00100000-0x0010FFFF Local use - Infrastructure
  • 0x00110000-0x0011FFFF Local use - Client
  • 0x00120000-0x0012FFFF Local use - Stack/Middleware
  • 0x00130000-0x0013FFFF Local use - Application
  • 0x00140000-0x0014FFFF Local use - User 
  • 0x00150000-0x001FFFFF RESERVED
Open (0x002)
  • 0x00200000-0x0020FFFF Open use - Infrastructure
  • 0x00210000-0x0021FFFF Open use - Client
  • 0x00220000-0x0022FFFF Open use - Stack/Middleware
  • 0x00230000-0x0023FFFF Open use - Application
  • 0x00240000-0x0024FFFF Open use - User 
  • 0x00250000-0x002FFFFF RESERVED
Testing (0x003)
  • 0x00300000-0x0030FFFF Testing - Infrastructure
  • 0x00310000-0x0031FFFF Testing - Client
  • 0x00320000-0x0032FFFF Testing - Stack/Middleware
  • 0x00330000-0x0033FFFF Testing - Application
  • 0x00340000-0x0034FFFF Testing - User 
  • 0x00350000-0x003FFFFF RESERVED
Reserved
  • 0x00400000-0xFFFFFFFF RESERVED

 

From this perspective we could have the following:
  • A kafka header used in the kafka open source distribution could have the value of 0x00000010 (10).
  • A header used internally at organisation X would fall under Local. If it was something used by infrastructure, like tracing, that could use 0x00100010 (1048592)
  • A header used by an open source project somebody is working on, that they just want to put out there (without coordination) could start using a value of 0x00220010 (2228240) if it was a plugin for annotating the location on a message.
  • An application that was testing whether it can use headers for a new feature it's developing could pick a header with key 0x0034010 (3407888).
  • The Kafka open source number space is coordinated via the Apache Kafka opensource project.  A class would list all possible headers, their numbers and their string equivalents (for output/logging/etc).
  • A Local organisation is in charge of coordinating it's local number space. It would be in charge of writing a text file, a class or a service to coordinate who gets what numbers when.
  • In the open internet you can grab any number in the Open space but should expect no guarantees that other people may or may not be using that number.
  • When you're doing testing you can safely grab a number in the testing space and be assured that you won't collide with an official system header.  It's still possible to collide with other testing headers, but no production system should depend on these headers.
Sample register that would end up having in the open source space.
keynamedescriptionbyurl
1client.idproducers client idApache Kafkasome url to a document about it
2cluster.idcluster id of where the message first originatedApache Kafkasome url to a document about it
3correlation.idcorrelation id for where a message is for mutex response from a requestApache Kafkasome url to a document about it
     
2100001new.relicstores the transaction linking guid for transaction sticking by new relicNew Relicsome url to a document about it
2100002appdynamicsstores the transaction linking guid for transaction stiching by app dynamicsAppDynamicssome url to a document about it

 

To assit and help ensure ease of use and uniformity a constants class should be kept and updated with the above (similar to how java sql codes work) e.g.

Code Block
package org.apache.kafka.common.config;

public class KafkaHeaderKeys
{

   public static final int CLIENT_ID_KEY = 1;
   public static final int CLUSTER_ID_KEY = 2;
   public static final int CORRELATION_ID_KEY = 3;
   
}

 

Sample register that would end up having local (In-house) custom per organisation
keynamedescriptionbynotes
1114000app.nameig's unique app name for the producerIGsome internal document about it
1050000charge.tagtag to make per message chargebacks forIGsome internal document about it

Compatibility, Deprecation, and Migration Plan

  • Current client users should not be affected, this is new api methods being added to client apis
  • Message version allows clients expecting older message version, would not be impacted 
    • older producers simply would produce a message without headers
      • new consumer would simply get a message with empty headers
    • older consumers simply would consume a message oblivious to there being any headers
  • Message version migration would be handled as like in KIP-32

Out of Scope

Some additional features/benefits were noticed and discussed on the above but are deemed out of scope and should be tackled by further KIPS.

  • Core message size reduction
    • remove overhead of 4 bytes for KeyLength when no headers using attributes bit
    • reduce overhead of 4 bytes for KeyLength by using variable length encoded int
    • reduce overhead of 4 bytes for ValueLength by using variable length encoded int
  • Broker side interceptors
    • with headers we could start introducing broker side message interceptors to append meta data or handle messages
  • Single Record consumer API
    • There is many uses cases where single record consumer/listener api is more user friendly - this is evident by the fact spring kafka have already created a wrapper, it would be good to support this natively.

Rejected Alternatives

...

                                   0x09 = byte[]
    Value => bytes <-------------------- NEW byte array holding the corresponding value in byte array form, 
										 0x00-0x002 - null and boolean this will be zero length
                                         

Compatibility, Deprecation, and Migration Plan

  • Current client users should not be affected, this is new api methods being added to client apis
  • Message version allows clients expecting older message version, would not be impacted 
    • older producers simply would produce a message without headers
      • new consumer would simply get a message with empty headers
    • older consumers simply would consume a message oblivious to there being any headers
  • Message version migration would be handled as like in KIP-32

Out of Scope

Some additional features/benefits were noticed and discussed on the above but are deemed out of scope and should be tackled by further KIPS.

  • Core message size reduction
    • remove overhead of 4 bytes for KeyLength when no headers using attributes bit
    • reduce overhead of 4 bytes for KeyLength by using variable length encoded int
    • reduce overhead of 4 bytes for ValueLength by using variable length encoded int
  • Broker side interceptors
    • with headers we could start introducing broker side message interceptors to append meta data or handle messages
  • Single Record consumer API
    • There is many uses cases where single record consumer/listener api is more user friendly - this is evident by the fact spring kafka have already created a wrapper, it would be good to support this natively.

Rejected Alternatives

Map<Int, byte[]> Headers added to the Producer/ConsumerRecord

The concept is similar to the above proposed but int keys

  • Benefits
    • more compact much reduced byte size overhead only 4 bytes.
    • String keys can dwarf the value in byte size.
  • Disadvantages
    • String keys are more common in many systems
    • Requires management of the int key space, where as string keys have natural key space management.
Map<String, String> Headers added to the ConsumerRecord

The concept is similar to the above proposed but with a few more disadvantages.

  • Benefits
    • Adds the ability for producers to set standard header key=value string value pairs
    • No incompatible client api change (only new methods)
    • Allows users to specify the serialisation of the key=value map (String(&=), JSON, AVRO).
    • Provides a standardised interface to eco systems of tools can grow around the feature
  • Disadvantages
    • Change to the message object
    • String key cause a large key, this can cause a message payload thats of 60bytes to be dwarfed by its headers
    • String value again doesn't allow for compact values, and restricts that a value must be a String
    • Not able to use headers on the broker side with custom serialisation

...