Motivation

To address the uses cases in KIP-82, an implementation that details the pros and cons and also the changes needed if a message wrapper solution was taken.

This is not the proposed solution, but more an elaboration of an alternative solution, to show more clearly why it was discounted.

Advantages/Disadvantages

 

The advantages of this option:
  • MessageAndOffset is limited in change to just using up one attribute bit for tombstone marker
  • Ability to set headers from a client side using a common interface
  • Message Headers are kept to client side only implementation

The disadvantage of this option

  • Change to the message object
  • Client users need to know in advance if message will use headers or not (as in will use the wrapper)
  • Headers are more for the platform needs, enforcing a wrapper makes this not invisible to end user code where only platforms need to add,consume headers via interceptors
  • Cannot make use of the headers server side whilst only client side implementation 
    • e.g. Server Side Plugins/Interceptors see  A Case for Kafka Headers
    • If made server aware, then is better to integrate into the core message properly this is one reason for discounting this over the proposed solution.
  • No versioning server side allowing older clients / other language clients to co-exist
    • Upgrade compatibility issue - new topics needed to be created
    • This is more prevalent for consumers
  • Point solution for handling compaction server side

The amount of change needed both client and server side is similar in size to proposed solution in KIP-82 but has more above drawbacks.

Changes

  1. Creation of a common Kafka Message Wrapper for Java Client
  2. Interceptors to be made message wrapper aware
  3. Client users to create the message wrapper inserting their value inside it before creating producer record
  4. The serialisation of the [int, bye[]] header set will on the wire using a strict format
  5. Each headers value will be custom serialisable by the interceptors/plugins that use the header.
  6. As int key based headers for compactness ranges will be reserved for different usages:
    1. client headers adopted by the kafka open source (not necessary the kafka open source owns it)
    2. broker headers adopted by the kafka open source (not necessary the kafka open source owns it)
    3. commercial vendor space
    4. custom inhouse
  7. To handle compaction issue
    1. Update Producer/Consumer record to set tombstone marker
    2. use an attribute bit 4 as boolean flag to mark if record should be deleted ("tombstone marker")
    3. Add ProduceRequest/ProduceResponse V4 which uses the new message format.
    4. Add FetchRequest/FetchResponse V4 which uses the new message format.

LogCleaner

Update method "shouldRetainMessage" to also look at attribute bit 4 for tombstone marker

MessageAndOffset

Wire protocol change

MessageAndOffset => Offset MessageSize Message
  Offset => int64  
  MessageSize => int32
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value
    Crc => int32
    MagicByte => int8  <---------------------- Bump up magic byte to 2
    Attributes => int8 <---------------------- Use Bit 4 as boolean flag for compaction to signify deletion / tombstone
    Timestamp => int64
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

Header Value Message Wrapper

MessageWrapper (Java)

public class MessageWrapper<V>
{
   private Map<Integer, byte[]> headers;
   private V value;
   
   public MessageWrapper(Map<Integer, byte[]> headers, V value){
      this.headers = headers;
      this.value = value;
   }

   public Map<Integer, byte[]> getHeaders()
   {
      return headers;
   }

   public V getValue()
   {
      return value;
   }
}

Wire protocol of the Message wrapper

MessageWrapper (bytes) => HeadersLength, Headers, ValueLength, Value
    MagicByte => int8  <------------------- 0 this is used for future versioning
    HeadersLength => int32 <--------------- size of the byte[] of the serialised headers
    Headers => bytes <--------------------- serialised form of the headers Map<int, byte[]>
    ValueLength => int32 <----------------- size of the byte[] of the serialised values
    Value => bytes <----------------------- byte[] of the value 
 
Headers (bytes) => Set(Key, ValueLength, Value)
  Set =>
	Key => int32 <---------------------- NEW int key of the header
    ValueLength => int32 <-------------- NEW size of the byte[] of the serialised header value
    Value => bytes <-------------------- NEW serialised form of the header value
 

Compatibility, Deprecation, and Migration Plan

  • MessageWrapper will not be back compatible
    • Current client users will be affected
      • new topics will be needed
      • will need to instantiate separately and produce the message wrapper.
  • Tombstone feature will be backwards compatible
    • Message version migration would be handled as like in KIP-32
  • No labels