Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Add a new headers value (byte[]) to the message format.
  2. Add a headers (Map<IntegerMap<int, byte[]>) field to ProducerRecord and ConsumerRecord. A producer/interceptors will be able to set headers on a ProducerRecord. A consumer/interceptors will see the message headers when it sees the messages.

  3. Add accessor methods on the Producer/ConsumerRecord void setHeader(int, byte[]) and a byte[] getHeader(int)
  4. Add ProduceRequest/ProduceResponse V3 which uses the new message format.
  5. Add FetchRequest/FetchResponse V3 which uses the new message format.
  6. The serialization of the [int, bye[]] header set will on the wire using a strict format
  7. Each headers value will be custom serializable by the interceptors/plugins that use the header.
  8. As int key based headers for compactness ranges will be reserved for different usages:
    1. client headers adopted by the kafka open source (not necessary the kafka open source owns it)
    2. broker headers adopted by the kafka open source (not necessary the kafka open source owns it)
    3. commercial vendor space
    4. custom inhouse

For more detail information of the above changes, please refer to the Proposed Changes section.

...

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

There are three four options proposed before this proposal. This details our proposed solution of Option 1 described here. The other options Option 2, Option 3 and Option 3 4 are in the Rejected Alternatives section.

...

  • Adds the ability for producers to set standard header key=value string value pairs
  • No incompatible client api change (only new methods)
  • Allows users to specify the serialization of the key=value map (String(&=), JSON, AVRO).header value per header
  • Compact key space
  • Provides a standardized interface to eco systems of tools can grow around the feature

The disadvantage of this proposal is:

  • Change to the message object

 

There are two wire protocol changes options for the above mention change - Option B is our preffered:

  • Option A: Use a headers flag (bit 4 of the attributes)
    • Advantages
      • No message size over head for messages without headers (aka no change / effect in message size for users not needing this)
    • Disadvantages
      • Uses up an attributes bit
  • Option B: No flag and simply use headers length (as done with key and value currently)
    • Advantages
      • Simpler and follows existing pattern with key and value where simply length value is used to determine if present
    • Disadvantages
      • Overhead of 4bytes for messages without any headers (will have a message size change / effect for users not needing this)

 

 

...

Add a headers field Map<int, byte[]> to both ProducerRecord and ConsumerRecord

  • Accessor methods of void setHeader(int, byte[]) and byte[] getHeader(int) added to interface of both.

Wire protocol change - use attribute bit4 as flag for if headers present. and add (optional) header size (variable int) and field to the message format

The below is for the core Message wire protocol change needed to fit headers into the message.

A key to this is to ensure the headers cause as little overhead or none if not present as possible.

  • The attributes flag bit is used to keep the message size the same as before if no headers are used
  • HeadersLength is a variable length encoded int saving bytes where headers are small in size/number

Code Block
languagejava
MessageAndOffset => Offset MessageSize Message
  Offset => int64  
  MessageSize => int32
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value
    Crc => int32
    MagicByte => int8  <---------------------- Bump up magic byte to 2
    Attributes => int8 <---------------------- Use Bit 4 as boolean flag for if headers present
    Timestamp => int64
    KeyLength => int32
    Key => bytes
    (optional) HeadersLength => variable int32 <------------------ NEW [optional] length of the byte[] of the serialized headers if headers
    (optional) Headers => bytes <--------------------------------- NEW [optional] serialized form of the headers Map<String, String>
    ValueLength => int32
    Value => bytes

...


Wire protocol

...

of the headers bytes (if present not above mentioned attributes bit flag)

Code Block

...

languagejava

...

Headers (bytes) => 

...

Count 

...

Set(Key, ValueLength, Value)
  

...

Count 

...

=> 

...

variable 

...

length 

...

encoded int32  
  

...

Set =>

...


	Key => 

...

variable 

...

length encoded int32
    

...

ValueLength => 

...

variable 

...

length encoded int32
    

...

Value => bytes

...

 

 

Key Allocation

As mentioned above ranges of keys will be reserved for different usages, we use variable encode int keys to reduce the key size overhead.

Whilst the open space of headers may bring to fruition many possible keys, the likely hood of a cluster/broker using 100's is unlikely so we should assign/reserve key space for the most used areas.

With this where ints are in the below ranges we get the benefits of less bytes for an int than standard 4 byte allocation and best benefit.

 

3 bytes :: -129 -> -32768
2 bytes :: -33 -> -128
1 bytes :: 0 -> -32
1 bytes :: 0 -> 127
2 bytes :: 128 -> 255
3 bytes :: 256 -> 65535

As such we propose that:

+ve ints 0->255 are reserved for the apache kafka open registered headers as these are more likely to be more commonly used as such saves space

-ve ints -1 -> -128 are reserved for in-house registered headers as these are the next most likely to be heavily used

-ve ints -129 and below can be used as a scratch space either where more in-house header space is required or devleopment

+ve ints 256 and above can be used for other vendor released or as a spill over for open source headers.

Sample register that would end up having in the open source space.

keynamedescriptionbyurl
1client.idproducers client idApache Kafkasome url to a document about it
2cluster.idcluster id of where the message first originatedApache Kafkasome url to a document about it
3correlation.idcorrelation id for where a message is for mutex response from a requestApache Kafkasome url to a document about it
     
260new.relicstores the transaction linking guid for transaction sticking by new relicAppdynamicssome url to a document about it
451appdynamicsstores the transaction linking guid for transaction stiching by app dynamicsAppdynamicssome url to a document about it

 

To assit and help ensure ease of use and uniformity a constants class should be kept and updated with the above (similar to how java sql codes work) e.g.

Code Block
package org.apache.kafka.common.config;

public class KafkaHeaderKeys
{

   public static final int CLIENT_ID_KEY = 1;
   public static final int CLUSTER_ID_KEY = 2;
   public static final int CORRELATION_ID_KEY = 3;
   
}

 

Sample register that would end up having in-house custom per user/company.

keynamedescriptionbynotes
-5app.nameigs unique app name for the producerIGsome internal document about it
-10charge.tagtag to make per message chargebacks forIGsome internal document about it

 

Add a headers field Map<String, String> to both ProducerRecord and ConsumerRecord

  • Accessor methods of void setHeader(String, String) and String getHeader(String) added to interface of both.

Add configurable Deserializer and Serializer to KafkaConsumer and KafkaProducer

  • KafkaProducer headers.serializer denotes the serializer to use
    • Producer uses the headers.serializer to convert the headers field (Map<String, String>) in the ProducerRecord to byte[]
  • KafkaConsumer headers.deserializer denotes the deserializer to use
    • Fetcher is updated to user the headers.deserialzier to convert byte[] to the headers field (Map<String, String>) set in the ConsumerRecord

Create a simple String UTF8 based Serializer/Deserializer implementation for headers.serializer/headers.deserialzier and default config to use this.

  • This would work by converting map into a string in form of key1=value1&key2=value2&key3=value3 (like query params, url encoding the key and values) and then getBytes("UTF-8") to get the byte[]

...

Compatibility, Deprecation, and Migration Plan

...

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

 

Map<String, String> Headers added to the ProducerRecord

The concept is similar to the above proposed but with a few more disadvantages.

The advantages of this proposal are:

  • Adds the ability for producers to set standard header key=value string value pairs
  • No incompatible client api change (only new methods)
  • Allows users to specify the serialization of the key=value map (String(&=), JSON, AVRO).
  • Provides a standardized interface to eco systems of tools can grow around the feature

The disadvantage of this proposal is:

  • Change to the message object
  • String key cause a large key, this can cause a message payload thats of 60bytes to be dwarfed by its headers
  • String value again doesn't allow for compact values, and restricts that a value must be a String
  • Not able to use headers on the broker side with custom serialization
Value Message Wrapper - Message<H, P>

...