Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Add a new headers length and value (byte[]) to the core message format.
  2. Add a headers (Map<int, byte[]>) field to ProducerRecord and ConsumerRecord. A producer/interceptors will be able to set headers on a ProducerRecord. A consumer/interceptors will see the message headers (immutable) when it sees the messages.

  3. Add accessor methods on the Producer/ConsumerRecord void setHeader(int, byte[]) and a byte[] getHeader(int)
  4. Add ProduceRequest/ProduceResponse V3 which uses the new message format.
  5. Add FetchRequest/FetchResponse V3 which uses the new message format.
  6. The serialization of the [int, bye[]] header set will on the wire using a strict format
  7. Each headers value will be custom serializable by the interceptors/plugins that use the header.
  8. As int key based headers for compactness ranges will be reserved for different usages:
    1. client headers adopted by the kafka open source (not necessary the kafka open source owns it)
    2. broker headers adopted by the kafka open source (not necessary the kafka open source owns it)
    3. commercial vendor space
    4. custom inhouse

...

  • Adds the ability for producers to set standard header key=value value pairs
  • No incompatible client api change (only new methods)
  • Allows users to specify the serialization of the header value per header
  • Compact key space
  • Provides a standardized interface to eco systems of tools can grow around the feature

The disadvantage of this proposal is:

  • Change to the message object

Add a headers field Map<int, byte[]> to both ProducerRecord and ConsumerRecord

  • Accessor methods of void setHeader(int, byte[]) and byte[] getHeader(int) added to interface of both.

Wire protocol change - use attribute bit4 as flag for if headers present. and add (optional) header size

...

and field to the message format

The below is for the core Message wire protocol change needed to fit headers into the message.

A key to this is to ensure the headers cause as little no overhead or none if not present as possible.

Code Block
languagejava
MessageAndOffset => Offset MessageSize Message
  Offset => int64  
  MessageSize => int32
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value
    Crc => int32
    MagicByte => int8  <---------------------- Bump up magic byte to 2
    Attributes => int8 <---------------------- Use Bit 4 as boolean flag for if headers present
    Timestamp => int64
    KeyLength => int32
    Key => bytes
    (optional) HeadersLength => variable int32 <------------------ NEW [optional] size of the byte[] of the serialized headers if headers
    (optional) Headers => bytes <--------------------------------- NEW [optional] serialized form of the headers Map<int, byte[]>
    ValueLength => int32
    Value => bytes

...

Code Block
languagejava
Headers (bytes) => Count Set(Key, ValueLength, Value)
  Count => variable length encoded int32  
  Set =>
	Key => variable length encoded int32
    ValueLength => variable length encoded int32
    Value => bytes
 

 

Key Allocation

As mentioned above ranges of keys will be reserved for different usages, we use variable encode int keys to reduce the key size overhead.

Whilst the open space of headers may bring to fruition many possible keys, the likely hood of a cluster/broker using 100's is unlikely so we should assign/reserve key space for the most used areas.

With this where ints are in the below ranges we get the benefits of less bytes for an int than standard 4 byte allocation and best benefit.

 

3 bytes :: -129 -> 
2 bytes :: -33 -> -128
1 bytes :: 0 -> -32
1 bytes :: 0 -> 127
2 bytes :: 128 -> 255
3 bytes :: 256 ->

As such we propose that:

+ve ints 0->255 are reserved for the apache kafka open registered headers as these are more likely to be more commonly used as such saves space

-ve ints -1 -> -128 are reserved for in-house registered headers as these are the next most likely to be heavily used

-ve ints -129 and below can be used as a scratch space either where more in-house header space is required or devleopment

...

.

 

Kafka open source  (0x000)
0x00000000-0x0000FFFF Kafka open source
0x00010000-0x000FFFFF RESERVED
Local (0x001)
0x00100000-0x0010FFFF Local use - Infrastructure
0x00110000-0x0011FFFF Local use - Client
0x00120000-0x0012FFFF Local use - Stack/Middleware
0x00130000-0x0013FFFF Local use - Application
0x00140000-0x0014FFFF Local use - User 
0x00150000-0x001FFFFF RESERVED
Open (0x002)
0x00200000-0x0020FFFF Open use - Infrastructure
0x00210000-0x0021FFFF Open use - Client
0x00220000-0x0022FFFF Open use - Stack/Middleware
0x00230000-0x0023FFFF Open use - Application
0x00240000-0x0024FFFF Open use - User 
0x00250000-0x002FFFFF RESERVED
Testing (0x003)
0x00300000-0x0030FFFF Testing - Infrastructure
0x00310000-0x0031FFFF Testing - Client
0x00320000-0x0032FFFF Testing - Stack/Middleware
0x00330000-0x0033FFFF Testing - Application
0x00340000-0x0034FFFF Testing - User 
0x00350000-0x003FFFFF RESERVED
Reserved
0x00400000-0xFFFFFFFF RESERVED
From this perspective we would have the following:
A kafka header used in the kafka open source distribution could have the value of 0x00000010 (10).
A header used internally at organisation X would fall under Local. If it was something used by infrastructure, like tracing, that could use 0x00100010 (1048592)
A header used by an open source project somebody is working on, that they just want to put out there (without coordination) could start using a value of 0x00220010 (2228240) if it was a plugin for annotating the location on a message.
An application that was testing whether it can use headers for a new feature it's developing could pick a header with key 0x0034010 (3407888).
The Kafka open source number space is coordinated via the Apache Kafka opensource project.  A class would list all possible headers, their numbers and their string equivalents (for output/logging/etc).
A Local organization is in charge of coordinating it's local number space. It would be in charge of writing a text file, a class or a service to coordinate who gets what numbers when.
In the open internet you can grab any number in the Open space but should expect no guarantees that other people may or may not be using that number.
When you're doing testing you can safely grab a number in the testing space and be assured that you won't collide with an official system header.  It's still possible to collide with other testing headers, but no production system should depend on these headers.

Sample register that would end up having in the open source space.

keynamedescriptionbyurl
1client.idproducers client idApache Kafkasome url to a document about it
2cluster.idcluster id of where the message first originatedApache Kafkasome url to a document about it
3correlation.idcorrelation id for where a message is for mutex response from a requestApache Kafkasome url to a document about it
     
2602100001new.relicstores the transaction linking guid for transaction sticking by new relicAppdynamicssome url to a document about it
4512100002appdynamicsstores the transaction linking guid for transaction stiching by app dynamicsAppdynamicssome url to a document about it

...

Code Block
package org.apache.kafka.common.config;

public class KafkaHeaderKeys
{

   public static final int CLIENT_ID_KEY = 1;
   public static final int CLUSTER_ID_KEY = 2;
   public static final int CORRELATION_ID_KEY = 3;
   
}

 

Sample register that would end up having

...

local (In-house) custom per

...

organisation

keynamedescriptionbynotes
-51114000app.nameigs ig's unique app name for the producerIGsome internal document about it
-101050000charge.tagtag to make per message chargebacks forIGsome internal document about it

...