Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Add a new headers length and value (byte[]) to the core message format.
  2. Add a headers (Map<int, byte[]>) field to ProducerRecord and ConsumerRecord. A producer/interceptors will be able to set headers on a ProducerRecord. A consumer/interceptors will see the message headers (immutable) when it sees the messages.

  3. Add accessor methods on the Producer/ConsumerRecord void setHeader(int, byte[]) and a byte[] getHeader(int)
  4. Add ProduceRequest/ProduceResponse V3 which uses the new message format.
  5. Add FetchRequest/FetchResponse V3 which uses the new message format.
  6. The serialization serialisation of the [int, bye[]] header set will on the wire using a strict format
  7. Each headers value will be custom serializable serialisable by the interceptors/plugins that use the header.
  8. As int key based headers for compactness ranges will be reserved for different usages:
    1. client headers adopted by the kafka open source (not necessary the kafka open source owns it)
    2. broker headers adopted by the kafka open source (not necessary the kafka open source owns it)
    3. commercial vendor space
    4. custom inhouse

...

  • Adds the ability for producers to set standard header key=value value pairs
  • No incompatible client api change (only new methods)
  • Allows users to specify the serialization serialisation of the header value per header
  • Compact key space
  • Provides a standardized standardised interface to eco systems of tools that then can grow around the feature

The disadvantage of this proposal is:

  • Change to the message object

Add a headers field Map<int, byte[]> to both ProducerRecord and ConsumerRecord

  • Accessor methods of void setHeader(int, byte[]) and byte[] getHeader(int) added to interface of the ProducerRecord.
  • Accessor methods of byte[] getHeader(int) added to interface of the ConsumerRecord. (immutable as such no set required)

...

Code Block
languagejava
MessageAndOffset => Offset MessageSize Message
  Offset => int64  
  MessageSize => int32
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value
    Crc => int32
    MagicByte => int8  <---------------------- Bump up magic byte to 2
    Attributes => int8 <---------------------- Use Bit 4 as boolean flag for if headers present
    Timestamp => int64
    KeyLength => int32
    Key => bytes
    (optional) HeadersLength => int32 <--------------- NEW [optional] size of the byte[] of the serializedserialised headers if headers
    (optional) Headers => bytes <--------------------- NEW [optional] serializedserialised form of the headers Map<int, byte[]>
    ValueLength => int32
    Value => bytes

...

Testing (0x003)
  • 0x00300000-0x0030FFFF Testing - Infrastructure
  • 0x00310000-0x0031FFFF Testing - Client
  • 0x00320000-0x0032FFFF Testing - Stack/Middleware
  • 0x00330000-0x0033FFFF Testing - Application
  • 0x00340000-0x0034FFFF Testing - User 
  • 0x00350000-0x003FFFFF RESERVED
Reserved
  • 0x00400000-0xFFFFFFFF RESERVED

 

From this perspective we would have the following:
  • A kafka header used in the kafka open source distribution could have the value of 0x00000010 (10).
  • A header used internally at organisation X would fall under Local. If it was something used by infrastructure, like tracing, that could use 0x00100010 (1048592)
  • A header used by an open source project somebody is working on, that they just want to put out there (without coordination) could start using a value of 0x00220010 (2228240) if it was a plugin for annotating the location on a message.
  • An application that was testing whether it can use headers for a new feature it's developing could pick a header with key 0x0034010 (3407888).
  • The Kafka open source number space is coordinated via the Apache Kafka opensource project.  A class would list all possible headers, their numbers and their string equivalents (for output/logging/etc).
  • A Local organization organisation is in charge of coordinating it's local number space. It would be in charge of writing a text file, a class or a service to coordinate who gets what numbers when.
  • In the open internet you can grab any number in the Open space but should expect no guarantees that other people may or may not be using that number.
  • When you're doing testing you can safely grab a number in the testing space and be assured that you won't collide with an official system header.  It's still possible to collide with other testing headers, but no production system should depend on these headers.

...

The concept is similar to the above proposed but with a few more disadvantages.The advantages of this proposal are:

  • Benefits
    • Adds the ability for producers to set standard header key=value string value pairs
    • No incompatible client api change (only new methods)
    • Allows users to specify the
  • serialization
    • serialisation of the key=value map (String(&=), JSON, AVRO).
    • Provides a
  • standardized
    • standardised interface to eco systems of tools can grow around the feature
The disadvantage of this proposal is:
  • Disadvantages
    • Change to the message object
    • String key cause a large key, this can cause a message payload thats of 60bytes to be dwarfed by its headers
    • String value again doesn't allow for compact values, and restricts that a value must be a String
    • Not able to use headers on the broker side with custom
    serialization
    • serialisation
Value Message Wrapper - Message<H, P>

...

  • Benefits
    • This will cause no broker side changes, and or message format changes
  • Disadvantages
    • This would not work with compaction where headers are needed to be sent on delete record which then would not deliver on many of the requirements.
    • Couple Serialization Serialisation and Deserialization Deserialisation of the value for both the header and payload.

...

The proposed change is that headers are Map<String, String> only, this alternative is that headers can be of any type denoted by H

  • Benefits
    • Complete customization customisation of what a header is.
  • Disadvatages
    • As generics don't allow for default type, this would cause breaking client interface compatibility if done on Producer/ConsumerRecord.
      • Possible work-around would be to have HeadersProducer/ConsumerRecord<K, H, V> that then Producer/ConsumerRecord extend where H is object, this though becomes ugly fast if kept for a time or would require a deprecation / refactor v2 period.

...