...
- Add a new headers length and value (byte[]) to the core message format.
Add a headers (Map<int, byte[]>) field to ProducerRecord and ConsumerRecord. A producer/interceptors will be able to set headers on a ProducerRecord. A consumer/interceptors will see the message headers (immutable) when it sees the messages.
- Add accessor methods on the Producer/ConsumerRecord void setHeader(int, byte[]) and a byte[] getHeader(int)
- Add ProduceRequest/ProduceResponse V3 which uses the new message format.
- Add FetchRequest/FetchResponse V3 which uses the new message format.
- The serialization of the [int, bye[]] header set will on the wire using a strict format
- Each headers value will be custom serializable by the interceptors/plugins that use the header.
- As int key based headers for compactness ranges will be reserved for different usages:
- client headers adopted by the kafka open source (not necessary the kafka open source owns it)
- broker headers adopted by the kafka open source (not necessary the kafka open source owns it)
- commercial vendor space
- custom inhouse
...
- Adds the ability for producers to set standard header key=value value pairs
- No incompatible client api change (only new methods)
- Allows users to specify the serialization of the header value per header
- Compact key space
- Provides a standardized interface to eco systems of tools can grow around the feature
The disadvantage of this proposal is:
- Change to the message object
Add a headers field Map<int, byte[]> to both ProducerRecord and ConsumerRecord
- Accessor methods of void setHeader(int, byte[]) and byte[] getHeader(int) added to interface of both.
Wire protocol change - use attribute bit4 as flag for if headers present. and add (optional) header size
...
and field to the message format
The below is for the core Message wire protocol change needed to fit headers into the message.
A key to this is to ensure the headers cause as little no overhead or none if not present as possible.
- The attributes flag bit is used to keep the message size the same as before if no headers are usedHeadersLength is a variable length encoded int saving bytes where headers are small in size/numbersee: https://github.com/msgpack/msgpack/blob/master/spec.md#formats-int
- OR see http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-320/org/apache/hadoop/io/file/tfile/Utils.java
Code Block | ||
---|---|---|
| ||
MessageAndOffset => Offset MessageSize Message Offset => int64 MessageSize => int32 Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value Crc => int32 MagicByte => int8 <---------------------- Bump up magic byte to 2 Attributes => int8 <---------------------- Use Bit 4 as boolean flag for if headers present Timestamp => int64 KeyLength => int32 Key => bytes (optional) HeadersLength => variable int32 <------------------ NEW [optional] size of the byte[] of the serialized headers if headers (optional) Headers => bytes <--------------------------------- NEW [optional] serialized form of the headers Map<int, byte[]> ValueLength => int32 Value => bytes |
...
Code Block | ||
---|---|---|
| ||
Headers (bytes) => Count Set(Key, ValueLength, Value) Count => variable length encoded int32 Set => Key => variable length encoded int32 ValueLength => variable length encoded int32 Value => bytes |
Key Allocation
As mentioned above ranges of keys will be reserved for different usages, we use variable encode int keys to reduce the key size overhead.
Whilst the open space of headers may bring to fruition many possible keys, the likely hood of a cluster/broker using 100's is unlikely so we should assign/reserve key space for the most used areas.
With this where ints are in the below ranges we get the benefits of less bytes for an int than standard 4 byte allocation and best benefit.
As such we propose that:
+ve ints 0->255 are reserved for the apache kafka open registered headers as these are more likely to be more commonly used as such saves space
-ve ints -1 -> -128 are reserved for in-house registered headers as these are the next most likely to be heavily used
-ve ints -129 and below can be used as a scratch space either where more in-house header space is required or devleopment
...
.
Sample register that would end up having in the open source space.
key | name | description | by | url |
---|---|---|---|---|
1 | client.id | producers client id | Apache Kafka | some url to a document about it |
2 | cluster.id | cluster id of where the message first originated | Apache Kafka | some url to a document about it |
3 | correlation.id | correlation id for where a message is for mutex response from a request | Apache Kafka | some url to a document about it |
2602100001 | new.relic | stores the transaction linking guid for transaction sticking by new relic | Appdynamics | some url to a document about it |
4512100002 | appdynamics | stores the transaction linking guid for transaction stiching by app dynamics | Appdynamics | some url to a document about it |
...
Code Block |
---|
package org.apache.kafka.common.config; public class KafkaHeaderKeys { public static final int CLIENT_ID_KEY = 1; public static final int CLUSTER_ID_KEY = 2; public static final int CORRELATION_ID_KEY = 3; } |
Sample register that would end up having
...
local (In-house) custom per
...
organisation
key | name | description | by | notes |
---|---|---|---|---|
-51114000 | app.name | igs ig's unique app name for the producer | IG | some internal document about it |
-101050000 | charge.tag | tag to make per message chargebacks for | IG | some internal document about it |
...